2010-06-17 15:49:42 -04:00
|
|
|
package spark
|
|
|
|
|
2012-10-31 03:56:40 -04:00
|
|
|
import java.io.{ObjectOutputStream, IOException, EOFException, ObjectInputStream}
|
2011-02-27 17:27:12 -05:00
|
|
|
import java.net.URL
|
2010-06-17 15:49:42 -04:00
|
|
|
import java.util.concurrent.atomic.AtomicLong
|
2010-08-18 18:25:57 -04:00
|
|
|
import java.util.Random
|
2011-06-05 07:14:43 -04:00
|
|
|
import java.util.Date
|
2012-06-07 03:25:47 -04:00
|
|
|
import java.util.{HashMap => JHashMap}
|
2010-06-17 15:49:42 -04:00
|
|
|
|
|
|
|
import scala.collection.mutable.ArrayBuffer
|
2012-06-07 03:25:47 -04:00
|
|
|
import scala.collection.Map
|
|
|
|
import scala.collection.mutable.HashMap
|
|
|
|
import scala.collection.JavaConversions.mapAsScalaMap
|
2010-06-17 15:49:42 -04:00
|
|
|
|
2012-10-30 19:09:37 -04:00
|
|
|
import org.apache.hadoop.fs.Path
|
2011-07-14 12:40:56 -04:00
|
|
|
import org.apache.hadoop.io.BytesWritable
|
|
|
|
import org.apache.hadoop.io.NullWritable
|
|
|
|
import org.apache.hadoop.io.Text
|
|
|
|
import org.apache.hadoop.io.Writable
|
|
|
|
import org.apache.hadoop.mapred.FileOutputCommitter
|
|
|
|
import org.apache.hadoop.mapred.HadoopWriter
|
2011-06-24 22:51:21 -04:00
|
|
|
import org.apache.hadoop.mapred.JobConf
|
2011-07-14 12:40:56 -04:00
|
|
|
import org.apache.hadoop.mapred.OutputCommitter
|
2011-06-24 22:51:21 -04:00
|
|
|
import org.apache.hadoop.mapred.OutputFormat
|
|
|
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat
|
2011-07-14 12:40:56 -04:00
|
|
|
import org.apache.hadoop.mapred.TextOutputFormat
|
2011-06-05 07:14:43 -04:00
|
|
|
|
2012-06-07 03:25:47 -04:00
|
|
|
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
|
|
|
|
|
|
|
|
import spark.partial.BoundedDouble
|
|
|
|
import spark.partial.CountEvaluator
|
|
|
|
import spark.partial.GroupedCountEvaluator
|
|
|
|
import spark.partial.PartialResult
|
2012-10-05 21:50:56 -04:00
|
|
|
import spark.rdd.BlockRDD
|
|
|
|
import spark.rdd.CartesianRDD
|
2012-10-05 22:53:54 -04:00
|
|
|
import spark.rdd.FilteredRDD
|
|
|
|
import spark.rdd.FlatMappedRDD
|
|
|
|
import spark.rdd.GlommedRDD
|
|
|
|
import spark.rdd.MappedRDD
|
|
|
|
import spark.rdd.MapPartitionsRDD
|
|
|
|
import spark.rdd.MapPartitionsWithSplitRDD
|
2012-10-05 21:50:56 -04:00
|
|
|
import spark.rdd.PipedRDD
|
|
|
|
import spark.rdd.SampledRDD
|
|
|
|
import spark.rdd.UnionRDD
|
2012-06-07 03:25:47 -04:00
|
|
|
import spark.storage.StorageLevel
|
|
|
|
|
2010-11-04 02:58:53 -04:00
|
|
|
import SparkContext._
|
|
|
|
|
2011-07-10 00:06:15 -04:00
|
|
|
/**
|
2012-02-09 16:26:23 -05:00
|
|
|
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
|
2012-10-05 01:59:36 -04:00
|
|
|
* partitioned collection of elements that can be operated on in parallel. This class contains the
|
|
|
|
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
|
|
|
|
* [[spark.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such
|
|
|
|
* as `groupByKey` and `join`; [[spark.DoubleRDDFunctions]] contains operations available only on
|
|
|
|
* RDDs of Doubles; and [[spark.SequenceFileRDDFunctions]] contains operations available on RDDs
|
|
|
|
* that can be saved as SequenceFiles. These operations are automatically available on any RDD of
|
|
|
|
* the right type (e.g. RDD[(Int, Int)] through implicit conversions when you
|
|
|
|
* `import spark.SparkContext._`.
|
2011-07-10 00:06:15 -04:00
|
|
|
*
|
2012-10-05 01:59:36 -04:00
|
|
|
* Internally, each RDD is characterized by five main properties:
|
2011-07-10 00:06:15 -04:00
|
|
|
*
|
2012-10-05 01:59:36 -04:00
|
|
|
* - A list of splits (partitions)
|
|
|
|
* - A function for computing each split
|
|
|
|
* - A list of dependencies on other RDDs
|
|
|
|
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
|
|
|
|
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
|
|
|
|
* an HDFS file)
|
2011-07-10 00:06:15 -04:00
|
|
|
*
|
2012-10-05 01:59:36 -04:00
|
|
|
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
|
|
|
|
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
|
|
|
|
* reading data from a new storage system) by overriding these functions. Please refer to the
|
|
|
|
* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
|
|
|
|
* on RDD internals.
|
2011-07-10 00:06:15 -04:00
|
|
|
*/
|
2012-10-29 14:55:27 -04:00
|
|
|
abstract class RDD[T: ClassManifest](
|
|
|
|
@transient var sc: SparkContext,
|
2012-10-30 19:09:37 -04:00
|
|
|
var dependencies_ : List[Dependency[_]]
|
2012-10-29 14:55:27 -04:00
|
|
|
) extends Serializable {
|
|
|
|
|
|
|
|
|
|
|
|
def this(@transient oneParent: RDD[_]) =
|
|
|
|
this(oneParent.context , List(new OneToOneDependency(oneParent)))
|
2012-02-09 16:26:23 -05:00
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
// Methods that must be implemented by subclasses:
|
|
|
|
|
|
|
|
/** Set of partitions in this RDD. */
|
2010-06-17 15:49:42 -04:00
|
|
|
def splits: Array[Split]
|
2012-10-05 01:59:36 -04:00
|
|
|
|
|
|
|
/** Function for computing a given partition. */
|
2011-02-27 22:15:52 -05:00
|
|
|
def compute(split: Split): Iterator[T]
|
2012-10-05 01:59:36 -04:00
|
|
|
|
|
|
|
/** How this RDD depends on any parent RDDs. */
|
2012-10-29 14:55:27 -04:00
|
|
|
def dependencies: List[Dependency[_]] = dependencies_
|
2012-10-05 01:59:36 -04:00
|
|
|
|
|
|
|
/** Record user function generating this RDD. */
|
|
|
|
private[spark] val origin = Utils.getSparkCallSite
|
2012-09-28 18:17:25 -04:00
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
/** Optionally overridden by subclasses to specify how they are partitioned. */
|
2011-03-07 02:38:16 -05:00
|
|
|
val partitioner: Option[Partitioner] = None
|
2011-05-22 20:12:29 -04:00
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
/** Optionally overridden by subclasses to specify placement preferences. */
|
2012-10-30 19:09:37 -04:00
|
|
|
def preferredLocations(split: Split): Seq[String] = {
|
|
|
|
if (isCheckpointed) {
|
|
|
|
checkpointRDD.preferredLocations(split)
|
|
|
|
} else {
|
|
|
|
Nil
|
|
|
|
}
|
|
|
|
}
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
/** The [[spark.SparkContext]] that this RDD was created on. */
|
2011-02-27 22:15:52 -05:00
|
|
|
def context = sc
|
2012-09-24 19:56:27 -04:00
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
/** A unique ID for this RDD (within its SparkContext). */
|
2011-02-27 22:15:52 -05:00
|
|
|
val id = sc.newRddId()
|
|
|
|
|
2012-06-07 03:25:47 -04:00
|
|
|
// Variables relating to persistence
|
|
|
|
private var storageLevel: StorageLevel = StorageLevel.NONE
|
2012-10-29 14:55:27 -04:00
|
|
|
|
2012-10-30 19:09:37 -04:00
|
|
|
/** Returns the first parent RDD */
|
2012-11-04 15:12:06 -05:00
|
|
|
protected[spark] def firstParent[U: ClassManifest] = {
|
2012-10-30 19:09:37 -04:00
|
|
|
dependencies.head.rdd.asInstanceOf[RDD[U]]
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Returns the `i` th parent RDD */
|
2012-11-04 15:12:06 -05:00
|
|
|
protected[spark] def parent[U: ClassManifest](i: Int) = dependencies(i).rdd.asInstanceOf[RDD[U]]
|
2012-10-30 19:09:37 -04:00
|
|
|
|
|
|
|
// Variables relating to checkpointing
|
2012-11-04 15:12:06 -05:00
|
|
|
protected val isCheckpointable = true // override to set this to false to avoid checkpointing an RDD
|
2012-10-30 19:09:37 -04:00
|
|
|
|
2012-11-04 15:12:06 -05:00
|
|
|
protected var shouldCheckpoint = false // set to true when an RDD is marked for checkpointing
|
|
|
|
protected var isCheckpointInProgress = false // set to true when checkpointing is in progress
|
|
|
|
protected[spark] var isCheckpointed = false // set to true after checkpointing is completed
|
|
|
|
|
|
|
|
protected[spark] var checkpointFile: String = null // set to the checkpoint file after checkpointing is completed
|
|
|
|
protected var checkpointRDD: RDD[T] = null // set to the HadoopRDD of the checkpoint file
|
|
|
|
protected var checkpointRDDSplits: Seq[Split] = null // set to the splits of the Hadoop RDD
|
2012-10-29 14:55:27 -04:00
|
|
|
|
|
|
|
// Methods available on all RDDs:
|
|
|
|
|
|
|
|
/**
|
2012-10-05 01:59:36 -04:00
|
|
|
* Set this RDD's storage level to persist its values across operations after the first time
|
|
|
|
* it is computed. Can only be called once on each RDD.
|
|
|
|
*/
|
2012-06-07 03:25:47 -04:00
|
|
|
def persist(newLevel: StorageLevel): RDD[T] = {
|
|
|
|
// TODO: Handle changes of StorageLevel
|
|
|
|
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
|
|
|
|
throw new UnsupportedOperationException(
|
|
|
|
"Cannot change storage level of an RDD after it was already assigned a level")
|
|
|
|
}
|
|
|
|
storageLevel = newLevel
|
2011-02-27 22:15:52 -05:00
|
|
|
this
|
|
|
|
}
|
2012-06-07 03:25:47 -04:00
|
|
|
|
2012-10-12 21:19:21 -04:00
|
|
|
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
|
2012-09-27 20:50:59 -04:00
|
|
|
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
|
2012-06-07 03:25:47 -04:00
|
|
|
|
2012-10-12 21:19:21 -04:00
|
|
|
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
|
2012-06-07 03:25:47 -04:00
|
|
|
def cache(): RDD[T] = persist()
|
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
|
2012-06-07 03:25:47 -04:00
|
|
|
def getStorageLevel = storageLevel
|
2012-10-29 14:55:27 -04:00
|
|
|
|
2012-10-30 19:09:37 -04:00
|
|
|
/**
|
|
|
|
* Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir`
|
|
|
|
* (set using setCheckpointDir()) and all references to its parent RDDs will be removed.
|
|
|
|
* This is used to truncate very long lineages. In the current implementation, Spark will save
|
|
|
|
* this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done.
|
|
|
|
* Hence, it is strongly recommended to use checkpoint() on RDDs when
|
|
|
|
* (i) Checkpoint() is called before the any job has been executed on this RDD.
|
|
|
|
* (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will
|
|
|
|
* require recomputation.
|
|
|
|
*/
|
|
|
|
protected[spark] def checkpoint() {
|
|
|
|
synchronized {
|
|
|
|
if (isCheckpointed || shouldCheckpoint || isCheckpointInProgress) {
|
|
|
|
// do nothing
|
|
|
|
} else if (isCheckpointable) {
|
2012-11-04 15:12:06 -05:00
|
|
|
if (sc.checkpointDir == null) {
|
|
|
|
throw new Exception("Checkpoint directory has not been set in the SparkContext.")
|
|
|
|
}
|
2012-10-30 19:09:37 -04:00
|
|
|
shouldCheckpoint = true
|
|
|
|
} else {
|
|
|
|
throw new Exception(this + " cannot be checkpointed")
|
|
|
|
}
|
2012-07-27 19:16:37 -04:00
|
|
|
}
|
2012-07-27 15:00:49 -04:00
|
|
|
}
|
2012-10-30 19:09:37 -04:00
|
|
|
|
2012-11-04 15:12:06 -05:00
|
|
|
def getCheckpointData(): Any = {
|
|
|
|
synchronized {
|
|
|
|
if (isCheckpointed) {
|
|
|
|
checkpointFile
|
|
|
|
} else {
|
|
|
|
null
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-10-30 19:09:37 -04:00
|
|
|
/**
|
|
|
|
* Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler after a job
|
|
|
|
* using this RDD has completed (therefore the RDD has been materialized and
|
|
|
|
* potentially stored in memory). In case this RDD is not marked for checkpointing,
|
|
|
|
* doCheckpoint() is called recursively on the parent RDDs.
|
|
|
|
*/
|
|
|
|
private[spark] def doCheckpoint() {
|
|
|
|
val startCheckpoint = synchronized {
|
|
|
|
if (isCheckpointable && shouldCheckpoint && !isCheckpointInProgress) {
|
|
|
|
isCheckpointInProgress = true
|
|
|
|
true
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (startCheckpoint) {
|
|
|
|
val rdd = this
|
|
|
|
val env = SparkEnv.get
|
|
|
|
|
|
|
|
// Spawn a new thread to do the checkpoint as it takes sometime to write the RDD to file
|
|
|
|
val th = new Thread() {
|
|
|
|
override def run() {
|
|
|
|
// Save the RDD to a file, create a new HadoopRDD from it,
|
|
|
|
// and change the dependencies from the original parents to the new RDD
|
|
|
|
SparkEnv.set(env)
|
|
|
|
rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString
|
|
|
|
rdd.saveAsObjectFile(checkpointFile)
|
|
|
|
rdd.synchronized {
|
|
|
|
rdd.checkpointRDD = context.objectFile[T](checkpointFile)
|
|
|
|
rdd.checkpointRDDSplits = rdd.checkpointRDD.splits
|
|
|
|
rdd.changeDependencies(rdd.checkpointRDD)
|
|
|
|
rdd.shouldCheckpoint = false
|
|
|
|
rdd.isCheckpointInProgress = false
|
|
|
|
rdd.isCheckpointed = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
th.start()
|
|
|
|
} else {
|
|
|
|
// Recursively call doCheckpoint() to perform checkpointing on parent RDD if they are marked
|
|
|
|
dependencies.foreach(_.rdd.doCheckpoint())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Changes the dependencies of this RDD from its original parents to the new [[spark.rdd.HadoopRDD]]
|
|
|
|
* (`newRDD`) created from the checkpoint file. This method must ensure that all references
|
|
|
|
* to the original parent RDDs must be removed to enable the parent RDDs to be garbage
|
|
|
|
* collected. Subclasses of RDD may override this method for implementing their own changing
|
|
|
|
* logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea.
|
|
|
|
*/
|
|
|
|
protected def changeDependencies(newRDD: RDD[_]) {
|
|
|
|
dependencies_ = List(new OneToOneDependency(newRDD))
|
|
|
|
}
|
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
/**
|
|
|
|
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
|
|
|
|
* This should ''not'' be called by users directly, but is available for implementors of custom
|
|
|
|
* subclasses of RDD.
|
|
|
|
*/
|
2011-02-27 22:15:52 -05:00
|
|
|
final def iterator(split: Split): Iterator[T] = {
|
2012-10-30 19:09:37 -04:00
|
|
|
if (isCheckpointed) {
|
|
|
|
// ASSUMPTION: Checkpoint Hadoop RDD will have same number of splits as original
|
|
|
|
checkpointRDD.iterator(checkpointRDDSplits(split.index))
|
|
|
|
} else if (storageLevel != StorageLevel.NONE) {
|
2012-06-07 03:25:47 -04:00
|
|
|
SparkEnv.get.cacheTracker.getOrCompute[T](this, split, storageLevel)
|
2011-02-27 22:15:52 -05:00
|
|
|
} else {
|
|
|
|
compute(split)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2011-07-13 00:19:52 -04:00
|
|
|
// Transformations (return a new RDD)
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return a new RDD by applying a function to all elements of this RDD.
|
|
|
|
*/
|
2011-02-27 02:15:33 -05:00
|
|
|
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
|
2012-10-09 00:13:29 -04:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Return a new RDD by first applying a function to all elements of this
|
|
|
|
* RDD, and then flattening the results.
|
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
|
2011-02-27 17:27:12 -05:00
|
|
|
new FlatMappedRDD(this, sc.clean(f))
|
2012-10-09 00:13:29 -04:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Return a new RDD containing only the elements that satisfy a predicate.
|
|
|
|
*/
|
2011-02-27 02:15:33 -05:00
|
|
|
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
|
2011-02-27 17:27:12 -05:00
|
|
|
|
2012-10-09 00:13:29 -04:00
|
|
|
/**
|
|
|
|
* Return a new RDD containing the distinct elements in this RDD.
|
|
|
|
*/
|
2012-09-29 02:44:19 -04:00
|
|
|
def distinct(numSplits: Int = splits.size): RDD[T] =
|
2012-09-29 02:55:17 -04:00
|
|
|
map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1)
|
2012-07-18 20:32:31 -04:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
2012-10-09 01:49:17 -04:00
|
|
|
* Return a sampled subset of this RDD.
|
2012-10-09 01:25:03 -04:00
|
|
|
*/
|
2011-07-13 00:19:52 -04:00
|
|
|
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
|
|
|
|
new SampledRDD(this, withReplacement, fraction, seed)
|
2010-10-03 23:28:20 -04:00
|
|
|
|
2011-11-21 19:38:44 -05:00
|
|
|
def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
|
2012-04-05 14:57:41 -04:00
|
|
|
var fraction = 0.0
|
|
|
|
var total = 0
|
|
|
|
var multiplier = 3.0
|
|
|
|
var initialCount = count()
|
|
|
|
var maxSelected = 0
|
|
|
|
|
2012-09-26 00:46:58 -04:00
|
|
|
if (initialCount > Integer.MAX_VALUE - 1) {
|
|
|
|
maxSelected = Integer.MAX_VALUE - 1
|
2012-04-05 14:57:41 -04:00
|
|
|
} else {
|
|
|
|
maxSelected = initialCount.toInt
|
|
|
|
}
|
|
|
|
|
|
|
|
if (num > initialCount) {
|
|
|
|
total = maxSelected
|
2012-06-29 21:47:12 -04:00
|
|
|
fraction = math.min(multiplier * (maxSelected + 1) / initialCount, 1.0)
|
2012-04-05 14:57:41 -04:00
|
|
|
} else if (num < 0) {
|
|
|
|
throw(new IllegalArgumentException("Negative number of elements requested"))
|
|
|
|
} else {
|
2012-06-29 21:47:12 -04:00
|
|
|
fraction = math.min(multiplier * (num + 1) / initialCount, 1.0)
|
|
|
|
total = num
|
2012-04-05 14:57:41 -04:00
|
|
|
}
|
|
|
|
|
2012-09-26 00:46:58 -04:00
|
|
|
val rand = new Random(seed)
|
|
|
|
var samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
|
2012-04-05 14:57:41 -04:00
|
|
|
|
|
|
|
while (samples.length < total) {
|
2012-09-26 00:46:58 -04:00
|
|
|
samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
|
2012-04-05 14:57:41 -04:00
|
|
|
}
|
|
|
|
|
2012-09-26 00:46:58 -04:00
|
|
|
Utils.randomizeInPlace(samples, rand).take(total)
|
2011-11-21 19:38:44 -05:00
|
|
|
}
|
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
/**
|
|
|
|
* Return the union of this RDD and another one. Any identical elements will appear multiple
|
|
|
|
* times (use `.distinct()` to eliminate them).
|
|
|
|
*/
|
2011-02-27 22:15:52 -05:00
|
|
|
def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
|
|
|
|
|
2012-10-05 01:59:36 -04:00
|
|
|
/**
|
|
|
|
* Return the union of this RDD and another one. Any identical elements will appear multiple
|
|
|
|
* times (use `.distinct()` to eliminate them).
|
|
|
|
*/
|
2011-02-27 22:15:52 -05:00
|
|
|
def ++(other: RDD[T]): RDD[T] = this.union(other)
|
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return an RDD created by coalescing all elements within each partition into an array.
|
|
|
|
*/
|
2011-07-13 00:19:52 -04:00
|
|
|
def glom(): RDD[Array[T]] = new GlommedRDD(this)
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2012-10-12 17:46:41 -04:00
|
|
|
/**
|
|
|
|
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
|
|
|
|
* elements (a, b) where a is in `this` and b is in `other`.
|
|
|
|
*/
|
2012-02-09 16:26:23 -05:00
|
|
|
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
|
|
|
|
* mapping to that key.
|
|
|
|
*/
|
2011-07-13 00:19:52 -04:00
|
|
|
def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
|
|
|
|
val cleanF = sc.clean(f)
|
|
|
|
this.map(t => (cleanF(t), t)).groupByKey(numSplits)
|
|
|
|
}
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return an RDD of grouped items.
|
|
|
|
*/
|
2012-02-09 16:26:23 -05:00
|
|
|
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return an RDD created by piping elements to a forked external process.
|
|
|
|
*/
|
2012-02-09 16:26:23 -05:00
|
|
|
def pipe(command: String): RDD[String] = new PipedRDD(this, command)
|
2011-06-20 02:05:19 -04:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return an RDD created by piping elements to a forked external process.
|
|
|
|
*/
|
2012-02-09 16:26:23 -05:00
|
|
|
def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command)
|
2011-06-20 02:05:19 -04:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return an RDD created by piping elements to a forked external process.
|
|
|
|
*/
|
2012-04-17 19:40:56 -04:00
|
|
|
def pipe(command: Seq[String], env: Map[String, String]): RDD[String] =
|
|
|
|
new PipedRDD(this, command, env)
|
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return a new RDD by applying a function to each partition of this RDD.
|
|
|
|
*/
|
2012-10-08 20:29:33 -04:00
|
|
|
def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
|
|
|
|
preservesPartitioning: Boolean = false): RDD[U] =
|
|
|
|
new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
|
2011-07-13 00:19:52 -04:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
|
|
|
|
* of the original partition.
|
|
|
|
*/
|
2012-09-26 20:11:28 -04:00
|
|
|
def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] =
|
|
|
|
new MapPartitionsWithSplitRDD(this, sc.clean(f))
|
|
|
|
|
2011-07-13 00:19:52 -04:00
|
|
|
// Actions (launch a job to return a value to the user program)
|
2012-10-09 00:13:29 -04:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Applies a function f to all elements of this RDD.
|
|
|
|
*/
|
2010-06-17 15:49:42 -04:00
|
|
|
def foreach(f: T => Unit) {
|
|
|
|
val cleanF = sc.clean(f)
|
2011-02-27 22:15:52 -05:00
|
|
|
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 00:13:29 -04:00
|
|
|
/**
|
|
|
|
* Return an array that contains all of the elements in this RDD.
|
|
|
|
*/
|
2010-06-17 15:49:42 -04:00
|
|
|
def collect(): Array[T] = {
|
2011-02-27 02:41:44 -05:00
|
|
|
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
|
2010-06-17 15:49:42 -04:00
|
|
|
Array.concat(results: _*)
|
|
|
|
}
|
2012-10-09 01:49:17 -04:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Return an array that contains all of the elements in this RDD.
|
|
|
|
*/
|
2012-06-07 03:25:47 -04:00
|
|
|
def toArray(): Array[T] = collect()
|
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
|
|
|
* Reduces the elements of this RDD using the specified associative binary operator.
|
|
|
|
*/
|
2010-06-17 15:49:42 -04:00
|
|
|
def reduce(f: (T, T) => T): T = {
|
|
|
|
val cleanF = sc.clean(f)
|
2011-02-27 02:15:33 -05:00
|
|
|
val reducePartition: Iterator[T] => Option[T] = iter => {
|
2012-02-09 16:26:23 -05:00
|
|
|
if (iter.hasNext) {
|
2011-11-30 14:36:36 -05:00
|
|
|
Some(iter.reduceLeft(cleanF))
|
2012-02-09 16:26:23 -05:00
|
|
|
}else {
|
2011-02-27 02:15:33 -05:00
|
|
|
None
|
2012-02-09 16:26:23 -05:00
|
|
|
}
|
2011-02-27 02:15:33 -05:00
|
|
|
}
|
2011-02-27 02:41:44 -05:00
|
|
|
val options = sc.runJob(this, reducePartition)
|
2010-06-17 15:49:42 -04:00
|
|
|
val results = new ArrayBuffer[T]
|
2012-02-09 16:26:23 -05:00
|
|
|
for (opt <- options; elem <- opt) {
|
2010-06-17 15:49:42 -04:00
|
|
|
results += elem
|
2012-02-09 16:26:23 -05:00
|
|
|
}
|
|
|
|
if (results.size == 0) {
|
2010-06-17 15:49:42 -04:00
|
|
|
throw new UnsupportedOperationException("empty collection")
|
2012-02-09 16:26:23 -05:00
|
|
|
} else {
|
2011-11-30 14:36:36 -05:00
|
|
|
return results.reduceLeft(cleanF)
|
2012-02-09 16:26:23 -05:00
|
|
|
}
|
2011-11-30 14:36:36 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2012-02-09 16:26:23 -05:00
|
|
|
* Aggregate the elements of each partition, and then the results for all the partitions, using a
|
|
|
|
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
|
|
|
|
* modify t1 and return it as its result value to avoid object allocation; however, it should not
|
|
|
|
* modify t2.
|
2011-11-30 14:36:36 -05:00
|
|
|
*/
|
|
|
|
def fold(zeroValue: T)(op: (T, T) => T): T = {
|
|
|
|
val cleanOp = sc.clean(op)
|
|
|
|
val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp))
|
|
|
|
return results.fold(zeroValue)(cleanOp)
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2012-02-09 16:26:23 -05:00
|
|
|
* Aggregate the elements of each partition, and then the results for all the partitions, using
|
|
|
|
* given combine functions and a neutral "zero value". This function can return a different result
|
|
|
|
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
|
|
|
|
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
|
|
|
|
* allowed to modify and return their first argument instead of creating a new U to avoid memory
|
|
|
|
* allocation.
|
2011-11-30 14:36:36 -05:00
|
|
|
*/
|
2012-02-09 18:50:26 -05:00
|
|
|
def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
|
2011-11-30 14:36:36 -05:00
|
|
|
val cleanSeqOp = sc.clean(seqOp)
|
|
|
|
val cleanCombOp = sc.clean(combOp)
|
|
|
|
val results = sc.runJob(this,
|
2012-02-09 16:26:23 -05:00
|
|
|
(iter: Iterator[T]) => iter.aggregate(zeroValue)(cleanSeqOp, cleanCombOp))
|
2011-11-30 14:36:36 -05:00
|
|
|
return results.fold(zeroValue)(cleanCombOp)
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
2012-10-09 00:13:29 -04:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Return the number of elements in the RDD.
|
|
|
|
*/
|
2011-02-27 22:15:52 -05:00
|
|
|
def count(): Long = {
|
2011-05-13 13:41:34 -04:00
|
|
|
sc.runJob(this, (iter: Iterator[T]) => {
|
|
|
|
var result = 0L
|
|
|
|
while (iter.hasNext) {
|
|
|
|
result += 1L
|
|
|
|
iter.next
|
|
|
|
}
|
|
|
|
result
|
|
|
|
}).sum
|
2011-02-27 22:15:52 -05:00
|
|
|
}
|
|
|
|
|
2012-06-07 03:25:47 -04:00
|
|
|
/**
|
2012-10-09 21:38:36 -04:00
|
|
|
* (Experimental) Approximate version of count() that returns a potentially incomplete result
|
|
|
|
* within a timeout, even if not all tasks have finished.
|
2012-06-07 03:25:47 -04:00
|
|
|
*/
|
|
|
|
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
|
|
|
|
val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
|
|
|
|
var result = 0L
|
|
|
|
while (iter.hasNext) {
|
|
|
|
result += 1L
|
|
|
|
iter.next
|
|
|
|
}
|
|
|
|
result
|
|
|
|
}
|
|
|
|
val evaluator = new CountEvaluator(splits.size, confidence)
|
|
|
|
sc.runApproximateJob(this, countElements, evaluator, timeout)
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2012-10-09 21:38:36 -04:00
|
|
|
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
|
|
|
|
* combine step happens locally on the master, equivalent to running a single reduce task.
|
2012-06-07 03:25:47 -04:00
|
|
|
*/
|
|
|
|
def countByValue(): Map[T, Long] = {
|
2012-10-09 21:38:36 -04:00
|
|
|
// TODO: This should perhaps be distributed by default.
|
2012-06-07 03:25:47 -04:00
|
|
|
def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
|
|
|
|
val map = new OLMap[T]
|
|
|
|
while (iter.hasNext) {
|
|
|
|
val v = iter.next()
|
|
|
|
map.put(v, map.getLong(v) + 1L)
|
|
|
|
}
|
|
|
|
Iterator(map)
|
|
|
|
}
|
|
|
|
def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
|
|
|
|
val iter = m2.object2LongEntrySet.fastIterator()
|
|
|
|
while (iter.hasNext) {
|
|
|
|
val entry = iter.next()
|
|
|
|
m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
|
|
|
|
}
|
|
|
|
return m1
|
|
|
|
}
|
|
|
|
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
|
|
|
|
myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2012-10-09 21:38:36 -04:00
|
|
|
* (Experimental) Approximate version of countByValue().
|
2012-06-07 03:25:47 -04:00
|
|
|
*/
|
|
|
|
def countByValueApprox(
|
|
|
|
timeout: Long,
|
|
|
|
confidence: Double = 0.95
|
|
|
|
): PartialResult[Map[T, BoundedDouble]] = {
|
|
|
|
val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
|
|
|
|
val map = new OLMap[T]
|
|
|
|
while (iter.hasNext) {
|
|
|
|
val v = iter.next()
|
|
|
|
map.put(v, map.getLong(v) + 1L)
|
|
|
|
}
|
|
|
|
map
|
|
|
|
}
|
|
|
|
val evaluator = new GroupedCountEvaluator[T](splits.size, confidence)
|
|
|
|
sc.runApproximateJob(this, countPartition, evaluator, timeout)
|
|
|
|
}
|
2011-03-07 02:38:16 -05:00
|
|
|
|
2012-02-09 16:26:23 -05:00
|
|
|
/**
|
|
|
|
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
|
|
|
|
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
|
|
|
|
* whole RDD instead.
|
|
|
|
*/
|
2010-06-17 15:49:42 -04:00
|
|
|
def take(num: Int): Array[T] = {
|
2012-02-09 16:26:23 -05:00
|
|
|
if (num == 0) {
|
2010-06-17 15:49:42 -04:00
|
|
|
return new Array[T](0)
|
2012-02-09 16:26:23 -05:00
|
|
|
}
|
2010-06-17 15:49:42 -04:00
|
|
|
val buf = new ArrayBuffer[T]
|
2011-05-19 15:47:09 -04:00
|
|
|
var p = 0
|
|
|
|
while (buf.size < num && p < splits.size) {
|
|
|
|
val left = num - buf.size
|
2011-07-14 12:40:56 -04:00
|
|
|
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
|
2011-05-19 15:47:09 -04:00
|
|
|
buf ++= res(0)
|
|
|
|
if (buf.size == num)
|
2010-06-17 15:49:42 -04:00
|
|
|
return buf.toArray
|
2011-05-19 15:47:09 -04:00
|
|
|
p += 1
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
return buf.toArray
|
|
|
|
}
|
|
|
|
|
2012-10-11 03:49:03 -04:00
|
|
|
/**
|
2012-10-09 00:13:29 -04:00
|
|
|
* Return the first element in this RDD.
|
|
|
|
*/
|
2011-07-14 12:40:56 -04:00
|
|
|
def first(): T = take(1) match {
|
2010-06-17 15:49:42 -04:00
|
|
|
case Array(t) => t
|
|
|
|
case _ => throw new UnsupportedOperationException("empty collection")
|
|
|
|
}
|
2011-06-24 22:51:21 -04:00
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
2012-10-09 01:49:17 -04:00
|
|
|
* Save this RDD as a text file, using string representations of elements.
|
2012-10-09 01:25:03 -04:00
|
|
|
*/
|
2011-06-24 22:51:21 -04:00
|
|
|
def saveAsTextFile(path: String) {
|
2012-02-09 16:26:23 -05:00
|
|
|
this.map(x => (NullWritable.get(), new Text(x.toString)))
|
|
|
|
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
|
2011-06-24 22:51:21 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 01:25:03 -04:00
|
|
|
/**
|
2012-10-09 01:49:17 -04:00
|
|
|
* Save this RDD as a SequenceFile of serialized objects.
|
2012-10-09 01:25:03 -04:00
|
|
|
*/
|
2011-06-24 22:51:21 -04:00
|
|
|
def saveAsObjectFile(path: String) {
|
2012-10-04 19:49:30 -04:00
|
|
|
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
|
2012-02-09 16:26:23 -05:00
|
|
|
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
|
|
|
|
.saveAsSequenceFile(path)
|
2011-06-24 22:51:21 -04:00
|
|
|
}
|
2012-08-03 16:37:35 -04:00
|
|
|
|
|
|
|
/** A private method for tests, to look at the contents of each partition */
|
|
|
|
private[spark] def collectPartitions(): Array[Array[T]] = {
|
|
|
|
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
|
|
|
|
}
|
2012-10-31 03:56:40 -04:00
|
|
|
|
|
|
|
@throws(classOf[IOException])
|
|
|
|
private def writeObject(oos: ObjectOutputStream) {
|
|
|
|
synchronized {
|
|
|
|
oos.defaultWriteObject()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@throws(classOf[IOException])
|
|
|
|
private def readObject(ois: ObjectInputStream) {
|
|
|
|
synchronized {
|
|
|
|
ois.defaultReadObject()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-10-04 19:49:30 -04:00
|
|
|
}
|