Refactored a whole lot to push all DStreams into the spark.streaming.dstream package.

This commit is contained in:
Tathagata Das 2012-12-30 15:19:55 -08:00
parent 9e644402c1
commit 7e0271b438
41 changed files with 402 additions and 333 deletions

View file

@ -1,6 +1,7 @@
package spark.rdd
import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Split, TaskContext}
import spark.SparkContext._
private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx

View file

@ -1,17 +1,15 @@
package spark.streaming
import spark.streaming.dstream._
import StreamingContext._
import Time._
import spark._
import spark.SparkContext._
import spark.rdd._
import spark.{RDD, Logging}
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import java.util.concurrent.ArrayBlockingQueue
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
@ -197,7 +195,7 @@ abstract class DStream[T: ClassManifest] (
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
"delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
"the Java property 'spark.cleaner.delay' to more than " +
math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes."
)
dependencies.foreach(_.validate())
@ -642,271 +640,3 @@ abstract class DStream[T: ClassManifest] (
private[streaming]
case class DStreamCheckpointData(rdds: HashMap[Time, Any])
abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
override def dependencies = List()
override def slideTime = {
if (ssc == null) throw new Exception("ssc is null")
if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
ssc.graph.batchDuration
}
def start()
def stop()
}
/**
* TODO
*/
private[streaming]
class MappedDStream[T: ClassManifest, U: ClassManifest] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
}
/**
* TODO
*/
private[streaming]
class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
parent: DStream[T],
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
}
/**
* TODO
*/
private[streaming]
class FilteredDStream[T: ClassManifest](
parent: DStream[T],
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[T]] = {
parent.getOrCompute(validTime).map(_.filter(filterFunc))
}
}
/**
* TODO
*/
private[streaming]
class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
parent: DStream[T],
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
}
}
/**
* TODO
*/
private[streaming]
class GlommedDStream[T: ClassManifest](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[Array[T]]] = {
parent.getOrCompute(validTime).map(_.glom())
}
}
/**
* TODO
*/
private[streaming]
class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
parent: DStream[(K,V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner
) extends DStream [(K,C)] (parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) =>
Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
case None => None
}
}
}
/**
* TODO
*/
private[streaming]
class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
parent: DStream[(K, V)],
mapValueFunc: V => U
) extends DStream[(K, U)](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
}
}
/**
* TODO
*/
private[streaming]
class FlatMapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
parent: DStream[(K, V)],
flatMapValueFunc: V => TraversableOnce[U]
) extends DStream[(K, U)](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
}
}
/**
* TODO
*/
class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
if (parents.length == 0) {
throw new IllegalArgumentException("Empty array of parents")
}
if (parents.map(_.ssc).distinct.size > 1) {
throw new IllegalArgumentException("Array of parents have different StreamingContexts")
}
if (parents.map(_.slideTime).distinct.size > 1) {
throw new IllegalArgumentException("Array of parents have different slide times")
}
override def dependencies = parents.toList
override def slideTime: Time = parents.head.slideTime
override def compute(validTime: Time): Option[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
parents.map(_.getOrCompute(validTime)).foreach(_ match {
case Some(rdd) => rdds += rdd
case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
})
if (rdds.size > 0) {
Some(new UnionRDD(ssc.sc, rdds))
} else {
None
}
}
}
/**
* TODO
*/
private[streaming]
class ForEachDStream[T: ClassManifest] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[Unit]] = None
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
}
/**
* TODO
*/
private[streaming]
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
parent: DStream[T],
transformFunc: (RDD[T], Time) => RDD[U]
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(transformFunc(_, validTime))
}
}

View file

@ -1,5 +1,6 @@
package spark.streaming
import dstream.InputDStream
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import collection.mutable.ArrayBuffer
import spark.Logging

View file

@ -1,5 +1,7 @@
package spark.streaming
import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import spark.Logging
import spark.SparkEnv

View file

@ -1,6 +1,9 @@
package spark.streaming
import spark.streaming.StreamingContext._
import spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
import spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
import spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
@ -218,13 +221,13 @@ extends Serializable {
def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
new MapValuesDStream[K, V, U](self, mapValuesFunc)
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
def flatMapValues[U: ClassManifest](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
new FlatMapValuesDStream[K, V, U](self, flatMapValuesFunc)
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {

View file

@ -4,9 +4,6 @@ import util.{ManualClock, RecurringTimer, Clock}
import spark.SparkEnv
import spark.Logging
import scala.collection.mutable.HashMap
private[streaming]
class Scheduler(ssc: StreamingContext) extends Logging {

View file

@ -1,10 +1,10 @@
package spark.streaming
import spark.RDD
import spark.Logging
import spark.SparkEnv
import spark.SparkContext
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
import scala.collection.mutable.Queue
@ -18,7 +18,6 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path
import java.util.UUID
import spark.util.MetadataCleaner
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@ -126,7 +125,7 @@ class StreamingContext private (
/**
* Create an input stream that pulls messages form a Kafka Broker.
*
* @param host Zookeper hostname.
* @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
@ -319,7 +318,7 @@ object StreamingContext {
protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
if (prefix == null) {
time.millis.toString
time.milliseconds.toString
} else if (suffix == null || suffix.length ==0) {
prefix + "-" + time.milliseconds
} else {

View file

@ -1,6 +1,11 @@
package spark.streaming
case class Time(millis: Long) {
/**
* This class is simple wrapper class that represents time in UTC.
* @param millis Time in UTC long
*/
case class Time(private val millis: Long) {
def < (that: Time): Boolean = (this.millis < that.millis)
@ -15,7 +20,9 @@ case class Time(millis: Long) {
def - (that: Time): Time = Time(millis - that.millis)
def * (times: Int): Time = Time(millis * times)
def / (that: Time): Long = millis / that.millis
def floor(that: Time): Time = {
val t = that.millis
val m = math.floor(this.millis / t).toLong

View file

@ -1,7 +1,8 @@
package spark.streaming
package spark.streaming.dstream
import spark.{RDD, Partitioner}
import spark.rdd.CoGroupedRDD
import spark.streaming.{Time, DStream}
class CoGroupedDStream[K : ClassManifest](
parents: Seq[DStream[(_, _)]],

View file

@ -1,6 +1,7 @@
package spark.streaming
package spark.streaming.dstream
import spark.RDD
import spark.streaming.{Time, StreamingContext}
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.

View file

@ -1,4 +1,4 @@
package spark.streaming
package spark.streaming.dstream
import java.util.concurrent.ArrayBlockingQueue
import scala.collection.mutable.ArrayBuffer

View file

@ -1,7 +1,8 @@
package spark.streaming
package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import spark.streaming.{StreamingContext, Time}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration

View file

@ -0,0 +1,21 @@
package spark.streaming.dstream
import spark.streaming.{DStream, Time}
import spark.RDD
private[streaming]
class FilteredDStream[T: ClassManifest](
parent: DStream[T],
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[T]] = {
parent.getOrCompute(validTime).map(_.filter(filterFunc))
}
}

View file

@ -0,0 +1,20 @@
package spark.streaming.dstream
import spark.streaming.{DStream, Time}
import spark.RDD
import spark.SparkContext._
private[streaming]
class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
parent: DStream[(K, V)],
flatMapValueFunc: V => TraversableOnce[U]
) extends DStream[(K, U)](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
}
}

View file

@ -0,0 +1,20 @@
package spark.streaming.dstream
import spark.streaming.{DStream, Time}
import spark.RDD
private[streaming]
class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
parent: DStream[T],
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
}

View file

@ -1,15 +1,20 @@
package spark.streaming
package spark.streaming.dstream
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import spark.streaming.StreamingContext
import spark.Utils
import spark.storage.StorageLevel
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
import scala.collection.JavaConversions._
import java.net.InetSocketAddress
import collection.JavaConversions._
import spark.Utils
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
class FlumeInputDStream[T: ClassManifest](

View file

@ -0,0 +1,28 @@
package spark.streaming.dstream
import spark.RDD
import spark.streaming.{DStream, Job, Time}
private[streaming]
class ForEachDStream[T: ClassManifest] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[Unit]] = None
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
}

View file

@ -0,0 +1,17 @@
package spark.streaming.dstream
import spark.streaming.{DStream, Time}
import spark.RDD
private[streaming]
class GlommedDStream[T: ClassManifest](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[Array[T]]] = {
parent.getOrCompute(validTime).map(_.glom())
}
}

View file

@ -0,0 +1,19 @@
package spark.streaming.dstream
import spark.streaming.{StreamingContext, DStream}
abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
override def dependencies = List()
override def slideTime = {
if (ssc == null) throw new Exception("ssc is null")
if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
ssc.graph.batchDuration
}
def start()
def stop()
}

View file

@ -1,17 +1,21 @@
package spark.streaming
package spark.streaming.dstream
import spark.Logging
import spark.storage.StorageLevel
import spark.streaming.{Time, DStreamCheckpointData, StreamingContext}
import java.util.Properties
import java.util.concurrent.Executors
import kafka.consumer._
import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.StringDecoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import spark._
import spark.RDD
import spark.storage.StorageLevel
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
@ -24,7 +28,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
* Input stream that pulls messages form a Kafka Broker.
* Input stream that pulls messages from a Kafka Broker.
*
* @param host Zookeper hostname.
* @param port Zookeper port.

View file

@ -0,0 +1,21 @@
package spark.streaming.dstream
import spark.streaming.{DStream, Time}
import spark.RDD
private[streaming]
class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
parent: DStream[T],
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
}
}

View file

@ -0,0 +1,21 @@
package spark.streaming.dstream
import spark.streaming.{DStream, Time}
import spark.RDD
import spark.SparkContext._
private[streaming]
class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
parent: DStream[(K, V)],
mapValueFunc: V => U
) extends DStream[(K, U)](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
}
}

View file

@ -0,0 +1,20 @@
package spark.streaming.dstream
import spark.streaming.{DStream, Time}
import spark.RDD
private[streaming]
class MappedDStream[T: ClassManifest, U: ClassManifest] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
}

View file

@ -1,12 +1,13 @@
package spark.streaming
package spark.streaming.dstream
import scala.collection.mutable.ArrayBuffer
import spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver}
import spark.{Logging, SparkEnv, RDD}
import spark.rdd.BlockRDD
import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import java.nio.ByteBuffer
import akka.actor.{Props, Actor}

View file

@ -1,10 +1,11 @@
package spark.streaming
package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
import spark.streaming.{Time, StreamingContext}
class QueueInputDStream[T: ClassManifest](
@transient ssc: StreamingContext,

View file

@ -1,12 +1,15 @@
package spark.streaming
package spark.streaming.dstream
import spark.{DaemonThread, Logging}
import spark.storage.StorageLevel
import spark.streaming.StreamingContext
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
import java.io.EOFException
import java.util.concurrent.ArrayBlockingQueue
import spark._
import spark.storage.StorageLevel
/**
* An input stream that reads blocks of serialized objects from a given network address.

View file

@ -1,16 +1,15 @@
package spark.streaming
package spark.streaming.dstream
import spark.streaming.StreamingContext._
import spark.RDD
import spark.rdd.UnionRDD
import spark.rdd.CoGroupedRDD
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import collection.SeqProxy
import spark.streaming.{Interval, Time, DStream}
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],

View file

@ -0,0 +1,27 @@
package spark.streaming.dstream
import spark.{RDD, Partitioner}
import spark.SparkContext._
import spark.streaming.{DStream, Time}
private[streaming]
class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
parent: DStream[(K,V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner
) extends DStream [(K,C)] (parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) =>
Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
case None => None
}
}
}

View file

@ -1,14 +1,10 @@
package spark.streaming
package spark.streaming.dstream
import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.streaming.StreamingContext
import spark.storage.StorageLevel
import java.io._
import java.net.Socket
import java.util.concurrent.ArrayBlockingQueue
import scala.collection.mutable.ArrayBuffer
import scala.Serializable
class SocketInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,

View file

@ -1,11 +1,10 @@
package spark.streaming
package spark.streaming.dstream
import spark.RDD
import spark.rdd.BlockRDD
import spark.Partitioner
import spark.rdd.MapPartitionsRDD
import spark.SparkContext._
import spark.storage.StorageLevel
import spark.streaming.{Time, DStream}
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],

View file

@ -0,0 +1,19 @@
package spark.streaming.dstream
import spark.RDD
import spark.streaming.{DStream, Time}
private[streaming]
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
parent: DStream[T],
transformFunc: (RDD[T], Time) => RDD[U]
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(transformFunc(_, validTime))
}
}

View file

@ -0,0 +1,39 @@
package spark.streaming.dstream
import spark.streaming.{DStream, Time}
import spark.RDD
import collection.mutable.ArrayBuffer
import spark.rdd.UnionRDD
class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
if (parents.length == 0) {
throw new IllegalArgumentException("Empty array of parents")
}
if (parents.map(_.ssc).distinct.size > 1) {
throw new IllegalArgumentException("Array of parents have different StreamingContexts")
}
if (parents.map(_.slideTime).distinct.size > 1) {
throw new IllegalArgumentException("Array of parents have different slide times")
}
override def dependencies = parents.toList
override def slideTime: Time = parents.head.slideTime
override def compute(validTime: Time): Option[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
parents.map(_.getOrCompute(validTime)).foreach(_ match {
case Some(rdd) => rdds += rdd
case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
})
if (rdds.size > 0) {
Some(new UnionRDD(ssc.sc, rdds))
} else {
None
}
}
}

View file

@ -1,8 +1,9 @@
package spark.streaming
package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import spark.storage.StorageLevel
import spark.streaming.{Interval, Time, DStream}
class WindowedDStream[T: ClassManifest](

View file

@ -25,7 +25,7 @@ object GrepRaw {
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = new UnionDStream(rawStreams)
val union = ssc.union(rawStreams)
union.filter(_.contains("Alice")).count().foreach(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()

View file

@ -34,7 +34,7 @@ object TopKWordCountRaw {
val lines = (1 to numStreams).map(_ => {
ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
})
val union = new UnionDStream(lines.toArray)
val union = ssc.union(lines)
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))

View file

@ -33,7 +33,7 @@ object WordCountRaw {
val lines = (1 to numStreams).map(_ => {
ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
})
val union = new UnionDStream(lines.toArray)
val union = ssc.union(lines)
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
windowedCounts.foreach(r => println("# unique words = " + r.count()))

View file

@ -42,7 +42,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2
val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
val secondNumBatches = firstNumBatches
// Setup the streams

View file

@ -133,7 +133,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
// Get the output buffer
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
val output = outputStream.output
val waitTime = (batchDuration.millis * (numBatches.toDouble + 0.5)).toLong
val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
val startTime = System.currentTimeMillis()
try {

View file

@ -1,5 +1,6 @@
package spark.streaming
import dstream.SparkFlumeEvent
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}

View file

@ -1,12 +1,16 @@
package spark.streaming
import spark.streaming.dstream.{InputDStream, ForEachDStream}
import spark.streaming.util.ManualClock
import spark.{RDD, Logging}
import util.ManualClock
import collection.mutable.ArrayBuffer
import org.scalatest.FunSuite
import collection.mutable.SynchronizedBuffer
import java.io.{ObjectInputStream, IOException}
import org.scalatest.FunSuite
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@ -70,6 +74,10 @@ trait TestSuiteBase extends FunSuite with Logging {
def actuallyWait = false
/**
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
*/
def setupStreams[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V]
@ -90,6 +98,10 @@ trait TestSuiteBase extends FunSuite with Logging {
ssc
}
/**
* Set up required DStreams to test the binary operation using the sequence
* of input collections.
*/
def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@ -173,6 +185,11 @@ trait TestSuiteBase extends FunSuite with Logging {
output
}
/**
* Verify whether the output values after running a DStream operation
* is same as the expected output values, by comparing the output
* collections either as lists (order matters) or sets (order does not matter)
*/
def verifyOutput[V: ClassManifest](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
@ -199,6 +216,10 @@ trait TestSuiteBase extends FunSuite with Logging {
logInfo("Output verified successfully")
}
/**
* Test unary DStream operation with a list of inputs, with number of
* batches to run same as the number of expected output values
*/
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@ -208,6 +229,15 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V](input, operation, expectedOutput, -1, useSet)
}
/**
* Test unary DStream operation with a list of inputs
* @param input Sequence of input collections
* @param operation Binary DStream operation to be applied to the 2 inputs
* @param expectedOutput Sequence of expected output collections
* @param numBatches Number of batches to run the operation for
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@ -221,6 +251,10 @@ trait TestSuiteBase extends FunSuite with Logging {
verifyOutput[V](output, expectedOutput, useSet)
}
/**
* Test binary DStream operation with two lists of inputs, with number of
* batches to run same as the number of expected output values
*/
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@ -231,6 +265,16 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet)
}
/**
* Test binary DStream operation with two lists of inputs
* @param input1 First sequence of input collections
* @param input2 Second sequence of input collections
* @param operation Binary DStream operation to be applied to the 2 inputs
* @param expectedOutput Sequence of expected output collections
* @param numBatches Number of batches to run the operation for
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],

View file

@ -209,7 +209,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.groupByKeyAndWindow(windowTime, slideTime)
.map(x => (x._1, x._2.toSet))
@ -223,7 +223,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@ -233,7 +233,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt))
}
@ -251,7 +251,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("window - " + name) {
val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.window(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@ -265,7 +265,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist()
}
@ -281,7 +281,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindowInv - " + name) {
val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime)
.persist()