Merge branch 'dev' of github.com:radlab/spark into dev

This commit is contained in:
Tathagata Das 2012-09-17 14:26:06 -07:00
commit 6d1fe02685
6 changed files with 52 additions and 35 deletions

View file

@ -28,7 +28,9 @@ class NewHadoopRDD[K, V](
@transient conf: Configuration)
extends RDD[(K, V)](sc) {
private val serializableConf = new SerializableWritable(conf)
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)
private val jobtrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@ -41,7 +43,7 @@ class NewHadoopRDD[K, V](
@transient
private val splits_ : Array[Split] = {
val inputFormat = inputFormatClass.newInstance
val jobContext = new JobContext(serializableConf.value, jobId)
val jobContext = new JobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Split](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@ -54,9 +56,9 @@ class NewHadoopRDD[K, V](
override def compute(theSplit: Split) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopSplit]
val conf = serializableConf.value
val conf = confBroadcast.value.value
val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
val context = new TaskAttemptContext(serializableConf.value, attemptId)
val context = new TaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
val reader = format.createRecordReader(split.serializableHadoopSplit.value, context)
reader.initialize(split.serializableHadoopSplit.value, context)

View file

@ -284,9 +284,8 @@ extends Logging with Serializable {
}
abstract class InputDStream[T: ClassManifest] (
@transient ssc: StreamingContext)
extends DStream[T](ssc) {
abstract class InputDStream[T: ClassManifest] (@transient ssc: StreamingContext)
extends DStream[T](ssc) {
override def dependencies = List()
@ -303,9 +302,9 @@ extends DStream[T](ssc) {
*/
class MappedDStream[T: ClassManifest, U: ClassManifest] (
parent: DStream[T],
mapFunc: T => U)
extends DStream[U](parent.ssc) {
@transient parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@ -322,9 +321,9 @@ extends DStream[U](parent.ssc) {
*/
class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
parent: DStream[T],
flatMapFunc: T => Traversable[U])
extends DStream[U](parent.ssc) {
@transient parent: DStream[T],
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@ -340,8 +339,10 @@ extends DStream[U](parent.ssc) {
* TODO
*/
class FilteredDStream[T: ClassManifest](parent: DStream[T], filterFunc: T => Boolean)
extends DStream[T](parent.ssc) {
class FilteredDStream[T: ClassManifest](
@transient parent: DStream[T],
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {
override def dependencies = List(parent)
@ -358,9 +359,9 @@ extends DStream[T](parent.ssc) {
*/
class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
parent: DStream[T],
mapPartFunc: Iterator[T] => Iterator[U])
extends DStream[U](parent.ssc) {
@transient parent: DStream[T],
mapPartFunc: Iterator[T] => Iterator[U]
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@ -376,7 +377,8 @@ extends DStream[U](parent.ssc) {
* TODO
*/
class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) {
class GlommedDStream[T: ClassManifest](@transient parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {
override def dependencies = List(parent)
@ -393,7 +395,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array
*/
class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
parent: DStream[(K,V)],
@transient parent: DStream[(K,V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
@ -418,7 +420,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
* TODO
*/
class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]])
class UnifiedDStream[T: ClassManifest](@transient parents: Array[DStream[T]])
extends DStream[T](parents(0).ssc) {
if (parents.length == 0) {
@ -457,7 +459,7 @@ class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]])
*/
class PerElementForEachDStream[T: ClassManifest] (
parent: DStream[T],
@transient parent: DStream[T],
foreachFunc: T => Unit
) extends DStream[Unit](parent.ssc) {
@ -488,7 +490,7 @@ class PerElementForEachDStream[T: ClassManifest] (
*/
class PerRDDForEachDStream[T: ClassManifest] (
parent: DStream[T],
@transient parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
@ -516,7 +518,7 @@ class PerRDDForEachDStream[T: ClassManifest] (
*/
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
parent: DStream[T],
@transient parent: DStream[T],
transformFunc: (RDD[T], Time) => RDD[U]
) extends DStream[U](parent.ssc) {

View file

@ -25,7 +25,11 @@ class QueueInputDStream[T: ClassManifest](
buffer ++= queue
}
if (buffer.size > 0) {
Some(new UnionRDD(ssc.sc, buffer.toSeq))
if (oneAtATime) {
Some(buffer.first)
} else {
Some(new UnionRDD(ssc.sc, buffer.toSeq))
}
} else if (defaultRDD != null) {
Some(defaultRDD)
} else {
@ -33,4 +37,4 @@ class QueueInputDStream[T: ClassManifest](
}
}
}
}

View file

@ -26,7 +26,6 @@ extends Logging {
val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_))
def start() {
val zeroTime = Time(timer.start())
outputStreams.foreach(_.initialize(zeroTime))
inputStreams.par.foreach(_.start())
@ -40,6 +39,7 @@ extends Logging {
}
def generateRDDs (time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {

View file

@ -8,7 +8,7 @@ import spark.SparkContext._
import spark.storage.StorageLevel
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
@transient parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
@ -26,14 +26,14 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
override def getOrCompute(time: Time): Option[RDD[(K, S)]] = {
generatedRDDs.get(time) match {
case Some(oldRDD) => {
if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) {
if (checkpointInterval != null && time > zeroTime && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) {
val r = oldRDD
val oldRDDBlockIds = oldRDD.splits.map(s => "rdd:" + r.id + ":" + s.index)
val checkpointedRDD = new BlockRDD[(K, S)](ssc.sc, oldRDDBlockIds) {
override val partitioner = oldRDD.partitioner
}
generatedRDDs.update(time, checkpointedRDD)
logInfo("Updated RDD of time " + time + " with its checkpointed version")
logInfo("Checkpointed RDD " + oldRDD.id + " of time " + time + " with its new RDD " + checkpointedRDD.id)
Some(checkpointedRDD)
} else {
Some(oldRDD)

View file

@ -1,11 +1,24 @@
package spark.streaming.examples
import spark.util.IntParam
import spark.SparkContext
import spark.SparkContext._
import spark.storage.StorageLevel
import spark.streaming._
import spark.streaming.StreamingContext._
import WordCount2_ExtraFunctions._
object TopKWordCountRaw {
def moreWarmup(sc: SparkContext) {
(0 until 40).foreach {i =>
sc.parallelize(1 to 20000000, 1000)
.map(_ % 1331).map(_.toString)
.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
.collect()
}
}
def main(args: Array[String]) {
if (args.length != 7) {
System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
@ -20,16 +33,12 @@ object TopKWordCountRaw {
ssc.setBatchDuration(Milliseconds(batchMs))
// Make sure some tasks have started on each node
ssc.sc.parallelize(1 to 1000, 1000).count()
ssc.sc.parallelize(1 to 1000, 1000).count()
ssc.sc.parallelize(1 to 1000, 1000).count()
moreWarmup(ssc.sc)
val rawStreams = (1 to streams).map(_ =>
ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnifiedDStream(rawStreams)
import WordCount2_ExtraFunctions._
val windowedCounts = union.mapPartitions(splitAndCountPartitions)
.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,