Merge branch 'dev' of github.com:radlab/spark into dev
This commit is contained in:
commit
0681bbc5d9
|
@ -143,7 +143,7 @@ extends Logging with Serializable {
|
|||
|
||||
/**
|
||||
* This method generates a SparkStreaming job for the given time
|
||||
* and may require to be overriden by subclasses
|
||||
* and may required to be overriden by subclasses
|
||||
*/
|
||||
def generateJob(time: Time): Option[Job] = {
|
||||
getOrCompute(time) match {
|
||||
|
@ -208,7 +208,7 @@ extends Logging with Serializable {
|
|||
new TransformedDStream(this, ssc.sc.clean(transformFunc))
|
||||
}
|
||||
|
||||
def toQueue = {
|
||||
def toBlockingQueue = {
|
||||
val queue = new ArrayBlockingQueue[RDD[T]](10000)
|
||||
this.foreachRDD(rdd => {
|
||||
queue.add(rdd)
|
||||
|
@ -256,6 +256,28 @@ extends Logging with Serializable {
|
|||
|
||||
def union(that: DStream[T]) = new UnifiedDStream(Array(this, that))
|
||||
|
||||
def slice(interval: Interval): Seq[RDD[T]] = {
|
||||
slice(interval.beginTime, interval.endTime)
|
||||
}
|
||||
|
||||
// Get all the RDDs between fromTime to toTime (both included)
|
||||
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
|
||||
|
||||
val rdds = new ArrayBuffer[RDD[T]]()
|
||||
var time = toTime.floor(slideTime)
|
||||
|
||||
|
||||
while (time >= zeroTime && time >= fromTime) {
|
||||
getOrCompute(time) match {
|
||||
case Some(rdd) => rdds += rdd
|
||||
case None => throw new Exception("Could not get old reduced RDD for time " + time)
|
||||
}
|
||||
time -= slideTime
|
||||
}
|
||||
|
||||
rdds.toSeq
|
||||
}
|
||||
|
||||
def register() {
|
||||
ssc.registerOutputStream(this)
|
||||
}
|
||||
|
|
|
@ -9,6 +9,10 @@ case class Interval(beginTime: Time, endTime: Time) {
|
|||
new Interval(beginTime + time, endTime + time)
|
||||
}
|
||||
|
||||
def - (time: Time): Interval = {
|
||||
new Interval(beginTime - time, endTime - time)
|
||||
}
|
||||
|
||||
def < (that: Interval): Boolean = {
|
||||
if (this.duration != that.duration) {
|
||||
throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]")
|
||||
|
|
|
@ -12,7 +12,7 @@ import spark.storage.StorageLevel
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
|
||||
parent: DStream[(K, V)],
|
||||
@transient parent: DStream[(K, V)],
|
||||
reduceFunc: (V, V) => V,
|
||||
invReduceFunc: (V, V) => V,
|
||||
_windowTime: Time,
|
||||
|
@ -28,9 +28,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
|
|||
throw new Exception("The slide duration of ReducedWindowedDStream (" + _slideTime + ") " +
|
||||
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
|
||||
|
||||
val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
|
||||
val allowPartialWindows = true
|
||||
//reducedStream.persist(StorageLevel.MEMORY_ONLY_DESER_2)
|
||||
@transient val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
|
||||
|
||||
override def dependencies = List(reducedStream)
|
||||
|
||||
|
@ -44,174 +42,95 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
|
|||
checkpointInterval: Time): DStream[(K,V)] = {
|
||||
super.persist(storageLevel, checkpointLevel, checkpointInterval)
|
||||
reducedStream.persist(storageLevel, checkpointLevel, checkpointInterval)
|
||||
this
|
||||
}
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
|
||||
|
||||
|
||||
// Notation:
|
||||
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
|
||||
|
||||
val currentTime = validTime
|
||||
val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime)
|
||||
val previousWindow = currentWindow - slideTime
|
||||
|
||||
logDebug("Window time = " + windowTime)
|
||||
logDebug("Slide time = " + slideTime)
|
||||
logDebug("ZeroTime = " + zeroTime)
|
||||
logDebug("Current window = " + currentWindow)
|
||||
logDebug("Previous window = " + previousWindow)
|
||||
|
||||
// _____________________________
|
||||
// | previous window _________|___________________
|
||||
// |___________________| current window | --------------> Time
|
||||
// | previous window _________|___________________
|
||||
// |___________________| current window | --------------> Time
|
||||
// |_____________________________|
|
||||
//
|
||||
//
|
||||
// |________ _________| |________ _________|
|
||||
// | |
|
||||
// V V
|
||||
// old time steps new time steps
|
||||
// old RDDs new RDDs
|
||||
//
|
||||
def getAdjustedWindow(endTime: Time, windowTime: Time): Interval = {
|
||||
val beginTime =
|
||||
if (allowPartialWindows && endTime - windowTime < parent.zeroTime) {
|
||||
parent.zeroTime
|
||||
} else {
|
||||
endTime - windowTime
|
||||
}
|
||||
Interval(beginTime, endTime)
|
||||
}
|
||||
|
||||
val currentTime = validTime
|
||||
val currentWindow = getAdjustedWindow(currentTime, windowTime)
|
||||
val previousWindow = getAdjustedWindow(currentTime - slideTime, windowTime)
|
||||
|
||||
logInfo("Current window = " + currentWindow)
|
||||
logInfo("Slide time = " + slideTime)
|
||||
logInfo("Previous window = " + previousWindow)
|
||||
logInfo("Parent.zeroTime = " + parent.zeroTime)
|
||||
|
||||
if (allowPartialWindows) {
|
||||
if (currentTime - slideTime <= parent.zeroTime) {
|
||||
reducedStream.getOrCompute(currentTime) match {
|
||||
case Some(rdd) => return Some(rdd)
|
||||
case None => throw new Exception("Could not get first reduced RDD for time " + currentTime)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (previousWindow.beginTime < parent.zeroTime) {
|
||||
if (currentWindow.beginTime < parent.zeroTime) {
|
||||
return None
|
||||
} else {
|
||||
// If this is the first feasible window, then generate reduced value in the naive manner
|
||||
val reducedRDDs = new ArrayBuffer[RDD[(K, V)]]()
|
||||
var t = currentWindow.endTime
|
||||
while (t > currentWindow.beginTime) {
|
||||
reducedStream.getOrCompute(t) match {
|
||||
case Some(rdd) => reducedRDDs += rdd
|
||||
case None => throw new Exception("Could not get reduced RDD for time " + t)
|
||||
}
|
||||
t -= reducedStream.slideTime
|
||||
}
|
||||
if (reducedRDDs.size == 0) {
|
||||
throw new Exception("Could not generate the first RDD for time " + validTime)
|
||||
}
|
||||
return Some(new UnionRDD(ssc.sc, reducedRDDs).reduceByKey(partitioner, reduceFunc))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the RDD of the reduced value of the previous window
|
||||
val previousWindowRDD = getOrCompute(previousWindow.endTime) match {
|
||||
case Some(rdd) => rdd.asInstanceOf[RDD[(_, _)]]
|
||||
case None => throw new Exception("Could not get previous RDD for time " + previousWindow.endTime)
|
||||
}
|
||||
|
||||
val oldRDDs = new ArrayBuffer[RDD[(_, _)]]()
|
||||
val newRDDs = new ArrayBuffer[RDD[(_, _)]]()
|
||||
|
||||
// Get the RDDs of the reduced values in "old time steps"
|
||||
var t = currentWindow.beginTime
|
||||
while (t > previousWindow.beginTime) {
|
||||
reducedStream.getOrCompute(t) match {
|
||||
case Some(rdd) => oldRDDs += rdd.asInstanceOf[RDD[(_, _)]]
|
||||
case None => throw new Exception("Could not get old reduced RDD for time " + t)
|
||||
}
|
||||
t -= reducedStream.slideTime
|
||||
}
|
||||
val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime)
|
||||
logDebug("# old RDDs = " + oldRDDs.size)
|
||||
|
||||
// Get the RDDs of the reduced values in "new time steps"
|
||||
t = currentWindow.endTime
|
||||
while (t > previousWindow.endTime) {
|
||||
reducedStream.getOrCompute(t) match {
|
||||
case Some(rdd) => newRDDs += rdd.asInstanceOf[RDD[(_, _)]]
|
||||
case None => throw new Exception("Could not get new reduced RDD for time " + t)
|
||||
}
|
||||
t -= reducedStream.slideTime
|
||||
val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime)
|
||||
logDebug("# new RDDs = " + newRDDs.size)
|
||||
|
||||
// Get the RDD of the reduced value of the previous window
|
||||
val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
|
||||
|
||||
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values
|
||||
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
|
||||
|
||||
// Cogroup the reduced RDDs and merge the reduced values
|
||||
val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
|
||||
val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
|
||||
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValuesFunc)
|
||||
|
||||
Some(mergedValuesRDD)
|
||||
}
|
||||
|
||||
def mergeValues(numOldValues: Int, numNewValues: Int)(seqOfValues: Seq[Seq[V]]): V = {
|
||||
|
||||
if (seqOfValues.size != 1 + numOldValues + numNewValues) {
|
||||
throw new Exception("Unexpected number of sequences of reduced values")
|
||||
}
|
||||
|
||||
val allRDDs = new ArrayBuffer[RDD[(_, _)]]()
|
||||
allRDDs += previousWindowRDD
|
||||
allRDDs ++= oldRDDs
|
||||
allRDDs ++= newRDDs
|
||||
|
||||
// Getting reduced values "old time steps" that will be removed from current window
|
||||
val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
|
||||
|
||||
val numOldRDDs = oldRDDs.size
|
||||
val numNewRDDs = newRDDs.size
|
||||
logInfo("Generated numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs)
|
||||
logInfo("Generating CoGroupedRDD with " + allRDDs.size + " RDDs")
|
||||
val newRDD = new CoGroupedRDD[K](allRDDs.toSeq, partitioner).asInstanceOf[RDD[(K,Seq[Seq[V]])]].map(x => {
|
||||
val (key, value) = x
|
||||
logDebug("value.size = " + value.size + ", numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs)
|
||||
if (value.size != 1 + numOldRDDs + numNewRDDs) {
|
||||
throw new Exception("Number of groups not odd!")
|
||||
// Getting reduced values "new time steps"
|
||||
val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
|
||||
|
||||
if (seqOfValues(0).isEmpty) {
|
||||
|
||||
// If previous window's reduce value does not exist, then at least new values should exist
|
||||
if (newValues.isEmpty) {
|
||||
throw new Exception("Neither previous window has value for key, nor new values found")
|
||||
}
|
||||
|
||||
// old values = reduced values of the "old time steps" that are eliminated from current window
|
||||
// new values = reduced values of the "new time steps" that are introduced to the current window
|
||||
// previous value = reduced value of the previous window
|
||||
// Reduce the new values
|
||||
// println("new values = " + newValues.map(_.toString).reduce(_ + " " + _))
|
||||
return newValues.reduce(reduceFunc)
|
||||
} else {
|
||||
|
||||
/*val numOldValues = (value.size - 1) / 2*/
|
||||
// Getting reduced values "old time steps"
|
||||
val oldValues =
|
||||
(0 until numOldRDDs).map(i => value(1 + i)).filter(_.size > 0).map(x => x(0))
|
||||
// Getting reduced values "new time steps"
|
||||
val newValues =
|
||||
(0 until numNewRDDs).map(i => value(1 + numOldRDDs + i)).filter(_.size > 0).map(x => x(0))
|
||||
|
||||
// If reduced value for the key does not exist in previous window, it should not exist in "old time steps"
|
||||
if (value(0).size == 0 && oldValues.size != 0) {
|
||||
throw new Exception("Unexpected: Key exists in old reduced values but not in previous reduced values")
|
||||
// Get the previous window's reduced value
|
||||
var tempValue = seqOfValues(0).head
|
||||
|
||||
// If old values exists, then inverse reduce then from previous value
|
||||
if (!oldValues.isEmpty) {
|
||||
// println("old values = " + oldValues.map(_.toString).reduce(_ + " " + _))
|
||||
tempValue = invReduceFunc(tempValue, oldValues.reduce(reduceFunc))
|
||||
}
|
||||
|
||||
// For the key, at least one of "old time steps", "new time steps" and previous window should have reduced values
|
||||
if (value(0).size == 0 && oldValues.size == 0 && newValues.size == 0) {
|
||||
throw new Exception("Unexpected: Key does not exist in any of old, new, or previour reduced values")
|
||||
// If new values exists, then reduce them with previous value
|
||||
if (!newValues.isEmpty) {
|
||||
// println("new values = " + newValues.map(_.toString).reduce(_ + " " + _))
|
||||
tempValue = reduceFunc(tempValue, newValues.reduce(reduceFunc))
|
||||
}
|
||||
|
||||
// Logic to generate the final reduced value for current window:
|
||||
//
|
||||
// If previous window did not have reduced value for the key
|
||||
// Then, return reduced value of "new time steps" as the final value
|
||||
// Else, reduced value exists in previous window
|
||||
// If "old" time steps did not have reduced value for the key
|
||||
// Then, reduce previous window's reduced value with that of "new time steps" for final value
|
||||
// Else, reduced values exists in "old time steps"
|
||||
// If "new values" did not have reduced value for the key
|
||||
// Then, inverse-reduce "old values" from previous window's reduced value for final value
|
||||
// Else, all 3 values exist, combine all of them together
|
||||
//
|
||||
logDebug("# old values = " + oldValues.size + ", # new values = " + newValues)
|
||||
val finalValue = {
|
||||
if (value(0).size == 0) {
|
||||
newValues.reduce(reduceFunc)
|
||||
} else {
|
||||
val prevValue = value(0)(0)
|
||||
logDebug("prev value = " + prevValue)
|
||||
if (oldValues.size == 0) {
|
||||
// assuming newValue.size > 0 (all 3 cannot be zero, as checked earlier)
|
||||
val temp = newValues.reduce(reduceFunc)
|
||||
reduceFunc(prevValue, temp)
|
||||
} else if (newValues.size == 0) {
|
||||
invReduceFunc(prevValue, oldValues.reduce(reduceFunc))
|
||||
} else {
|
||||
val tempValue = invReduceFunc(prevValue, oldValues.reduce(reduceFunc))
|
||||
reduceFunc(tempValue, newValues.reduce(reduceFunc))
|
||||
}
|
||||
}
|
||||
}
|
||||
(key, finalValue)
|
||||
})
|
||||
//newRDD.persist(StorageLevel.MEMORY_ONLY_DESER_2)
|
||||
Some(newRDD)
|
||||
// println("final value = " + tempValue)
|
||||
return tempValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ extends Logging {
|
|||
}
|
||||
|
||||
def generateRDDs (time: Time) {
|
||||
println("\n-----------------------------------------------------\n")
|
||||
logInfo("\n-----------------------------------------------------\n")
|
||||
logInfo("Generating RDDs for time " + time)
|
||||
outputStreams.foreach(outputStream => {
|
||||
outputStream.generateJob(time) match {
|
||||
|
|
|
@ -1,12 +1,8 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.streaming.StreamingContext._
|
||||
|
||||
import spark.RDD
|
||||
import spark.UnionRDD
|
||||
import spark.SparkContext._
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
class WindowedDStream[T: ClassManifest](
|
||||
parent: DStream[T],
|
||||
|
@ -22,8 +18,6 @@ class WindowedDStream[T: ClassManifest](
|
|||
throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
|
||||
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
|
||||
|
||||
val allowPartialWindows = true
|
||||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
def windowTime: Time = _windowTime
|
||||
|
@ -31,36 +25,8 @@ class WindowedDStream[T: ClassManifest](
|
|||
override def slideTime: Time = _slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[T]] = {
|
||||
val parentRDDs = new ArrayBuffer[RDD[T]]()
|
||||
val windowEndTime = validTime
|
||||
val windowStartTime = if (allowPartialWindows && windowEndTime - windowTime < parent.zeroTime) {
|
||||
parent.zeroTime
|
||||
} else {
|
||||
windowEndTime - windowTime
|
||||
}
|
||||
|
||||
logInfo("Window = " + windowStartTime + " - " + windowEndTime)
|
||||
logInfo("Parent.zeroTime = " + parent.zeroTime)
|
||||
|
||||
if (windowStartTime >= parent.zeroTime) {
|
||||
// Walk back through time, from the 'windowEndTime' to 'windowStartTime'
|
||||
// and get all parent RDDs from the parent DStream
|
||||
var t = windowEndTime
|
||||
while (t > windowStartTime) {
|
||||
parent.getOrCompute(t) match {
|
||||
case Some(rdd) => parentRDDs += rdd
|
||||
case None => throw new Exception("Could not generate parent RDD for time " + t)
|
||||
}
|
||||
t -= parent.slideTime
|
||||
}
|
||||
}
|
||||
|
||||
// Do a union of all parent RDDs to generate the new RDD
|
||||
if (parentRDDs.size > 0) {
|
||||
Some(new UnionRDD(ssc.sc, parentRDDs))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime)
|
||||
Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.streaming.StreamingContext._
|
||||
import scala.runtime.RichInt
|
||||
|
||||
class DStreamBasicSuite extends DStreamSuiteBase {
|
||||
|
||||
test("map-like operations") {
|
||||
val input = Seq(1 to 4, 5 to 8, 9 to 12)
|
||||
|
||||
// map
|
||||
testOperation(input, (r: DStream[Int]) => r.map(_.toString), input.map(_.map(_.toString)))
|
||||
|
||||
// flatMap
|
||||
testOperation(
|
||||
input,
|
||||
(r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)),
|
||||
input.map(_.flatMap(x => Array(x, x * 2)))
|
||||
)
|
||||
}
|
||||
|
||||
test("shuffle-based operations") {
|
||||
// reduceByKey
|
||||
testOperation(
|
||||
Seq(Seq("a", "a", "b"), Seq("", ""), Seq()),
|
||||
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
|
||||
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
|
||||
true
|
||||
)
|
||||
|
||||
// reduce
|
||||
testOperation(
|
||||
Seq(1 to 4, 5 to 8, 9 to 12),
|
||||
(s: DStream[Int]) => s.reduce(_ + _),
|
||||
Seq(Seq(10), Seq(26), Seq(42))
|
||||
)
|
||||
}
|
||||
|
||||
test("stateful operations") {
|
||||
val inputData =
|
||||
Seq(
|
||||
Seq("a", "b", "c"),
|
||||
Seq("a", "b", "c"),
|
||||
Seq("a", "b", "c")
|
||||
)
|
||||
|
||||
val outputData =
|
||||
Seq(
|
||||
Seq(("a", 1), ("b", 1), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 2)),
|
||||
Seq(("a", 3), ("b", 3), ("c", 3))
|
||||
)
|
||||
|
||||
val updateStateOp = (s: DStream[String]) => {
|
||||
val updateFunc = (values: Seq[Int], state: RichInt) => {
|
||||
var newState = 0
|
||||
if (values != null) newState += values.reduce(_ + _)
|
||||
if (state != null) newState += state.self
|
||||
//println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
|
||||
new RichInt(newState)
|
||||
}
|
||||
s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
|
||||
}
|
||||
|
||||
testOperation(inputData, updateStateOp, outputData, true)
|
||||
}
|
||||
}
|
|
@ -1,123 +0,0 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.Logging
|
||||
import spark.streaming.StreamingContext._
|
||||
import spark.streaming.util.ManualClock
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.runtime.RichInt
|
||||
|
||||
class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
|
||||
|
||||
var ssc: StreamingContext = null
|
||||
val batchDurationMillis = 1000
|
||||
|
||||
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
|
||||
|
||||
def testOp[U: ClassManifest, V: ClassManifest](
|
||||
input: Seq[Seq[U]],
|
||||
operation: DStream[U] => DStream[V],
|
||||
expectedOutput: Seq[Seq[V]],
|
||||
useSet: Boolean = false
|
||||
) {
|
||||
try {
|
||||
ssc = new StreamingContext("local", "test")
|
||||
ssc.setBatchDuration(Milliseconds(batchDurationMillis))
|
||||
|
||||
val inputStream = ssc.createQueueStream(input.map(ssc.sc.makeRDD(_, 2)).toIterator)
|
||||
val outputStream = operation(inputStream)
|
||||
val outputQueue = outputStream.toQueue
|
||||
|
||||
ssc.start()
|
||||
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
clock.addToTime(input.size * batchDurationMillis)
|
||||
|
||||
Thread.sleep(1000)
|
||||
|
||||
val output = new ArrayBuffer[Seq[V]]()
|
||||
while(outputQueue.size > 0) {
|
||||
val rdd = outputQueue.take()
|
||||
output += (rdd.collect())
|
||||
}
|
||||
|
||||
assert(output.size === expectedOutput.size)
|
||||
for (i <- 0 until output.size) {
|
||||
if (useSet) {
|
||||
assert(output(i).toSet === expectedOutput(i).toSet)
|
||||
} else {
|
||||
assert(output(i).toList === expectedOutput(i).toList)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
ssc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
test("map-like operations") {
|
||||
val inputData = Seq(1 to 4, 5 to 8, 9 to 12)
|
||||
|
||||
// map
|
||||
testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
|
||||
|
||||
// flatMap
|
||||
testOp(
|
||||
inputData,
|
||||
(r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)),
|
||||
inputData.map(_.flatMap(x => Array(x, x * 2)))
|
||||
)
|
||||
}
|
||||
|
||||
test("shuffle-based operations") {
|
||||
// reduceByKey
|
||||
testOp(
|
||||
Seq(Seq("a", "a", "b"), Seq("", ""), Seq()),
|
||||
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
|
||||
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
|
||||
true
|
||||
)
|
||||
|
||||
// reduce
|
||||
testOp(
|
||||
Seq(1 to 4, 5 to 8, 9 to 12),
|
||||
(s: DStream[Int]) => s.reduce(_ + _),
|
||||
Seq(Seq(10), Seq(26), Seq(42))
|
||||
)
|
||||
}
|
||||
|
||||
test("window-based operations") {
|
||||
|
||||
}
|
||||
|
||||
|
||||
test("stateful operations") {
|
||||
val inputData =
|
||||
Seq(
|
||||
Seq("a", "b", "c"),
|
||||
Seq("a", "b", "c"),
|
||||
Seq("a", "b", "c")
|
||||
)
|
||||
|
||||
val outputData =
|
||||
Seq(
|
||||
Seq(("a", 1), ("b", 1), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 2)),
|
||||
Seq(("a", 3), ("b", 3), ("c", 3))
|
||||
)
|
||||
|
||||
val updateStateOp = (s: DStream[String]) => {
|
||||
val updateFunc = (values: Seq[Int], state: RichInt) => {
|
||||
var newState = 0
|
||||
if (values != null) newState += values.reduce(_ + _)
|
||||
if (state != null) newState += state.self
|
||||
println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
|
||||
new RichInt(newState)
|
||||
}
|
||||
s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
|
||||
}
|
||||
|
||||
testOp(inputData, updateStateOp, outputData, true)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.{RDD, Logging}
|
||||
import util.ManualClock
|
||||
import collection.mutable.ArrayBuffer
|
||||
import org.scalatest.FunSuite
|
||||
import scala.collection.mutable.Queue
|
||||
|
||||
|
||||
trait DStreamSuiteBase extends FunSuite with Logging {
|
||||
|
||||
def batchDuration() = Seconds(1)
|
||||
|
||||
def maxWaitTimeMillis() = 10000
|
||||
|
||||
def testOperation[U: ClassManifest, V: ClassManifest](
|
||||
input: Seq[Seq[U]],
|
||||
operation: DStream[U] => DStream[V],
|
||||
expectedOutput: Seq[Seq[V]],
|
||||
useSet: Boolean = false
|
||||
) {
|
||||
|
||||
val manualClock = true
|
||||
|
||||
if (manualClock) {
|
||||
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
|
||||
}
|
||||
|
||||
val ssc = new StreamingContext("local", "test")
|
||||
|
||||
try {
|
||||
ssc.setBatchDuration(Milliseconds(batchDuration))
|
||||
|
||||
val inputQueue = new Queue[RDD[U]]()
|
||||
inputQueue ++= input.map(ssc.sc.makeRDD(_, 2))
|
||||
val emptyRDD = ssc.sc.makeRDD(Seq[U](), 2)
|
||||
|
||||
val inputStream = ssc.createQueueStream(inputQueue, true, emptyRDD)
|
||||
val outputStream = operation(inputStream)
|
||||
|
||||
val output = new ArrayBuffer[Seq[V]]()
|
||||
outputStream.foreachRDD(rdd => output += rdd.collect())
|
||||
|
||||
ssc.start()
|
||||
|
||||
val clock = ssc.scheduler.clock
|
||||
if (clock.isInstanceOf[ManualClock]) {
|
||||
clock.asInstanceOf[ManualClock].addToTime(input.size * batchDuration.milliseconds)
|
||||
}
|
||||
|
||||
val startTime = System.currentTimeMillis()
|
||||
while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
|
||||
Thread.sleep(500)
|
||||
}
|
||||
|
||||
assert(output.size === expectedOutput.size)
|
||||
for (i <- 0 until output.size) {
|
||||
if (useSet) {
|
||||
assert(output(i).toSet === expectedOutput(i).toSet)
|
||||
} else {
|
||||
assert(output(i).toList === expectedOutput(i).toList)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
ssc.stop()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.streaming.StreamingContext._
|
||||
|
||||
class DStreamWindowSuite extends DStreamSuiteBase {
|
||||
|
||||
val largerSlideInput = Seq(
|
||||
Seq(("a", 1)), // 1st window from here
|
||||
Seq(("a", 2)),
|
||||
Seq(("a", 3)), // 2nd window from here
|
||||
Seq(("a", 4)),
|
||||
Seq(("a", 5)), // 3rd window from here
|
||||
Seq(("a", 6)),
|
||||
Seq(), // 4th window from here
|
||||
Seq(),
|
||||
Seq() // 5th window from here
|
||||
)
|
||||
|
||||
val largerSlideOutput = Seq(
|
||||
Seq(("a", 1)),
|
||||
Seq(("a", 6)),
|
||||
Seq(("a", 14)),
|
||||
Seq(("a", 15)),
|
||||
Seq(("a", 6))
|
||||
)
|
||||
|
||||
val bigInput = Seq(
|
||||
Seq(("a", 1)),
|
||||
Seq(("a", 1), ("b", 1)),
|
||||
Seq(("a", 1), ("b", 1), ("c", 1)),
|
||||
Seq(("a", 1), ("b", 1)),
|
||||
Seq(("a", 1)),
|
||||
Seq(),
|
||||
Seq(("a", 1)),
|
||||
Seq(("a", 1), ("b", 1)),
|
||||
Seq(("a", 1), ("b", 1), ("c", 1)),
|
||||
Seq(("a", 1), ("b", 1)),
|
||||
Seq(("a", 1)),
|
||||
Seq()
|
||||
)
|
||||
|
||||
val bigOutput = Seq(
|
||||
Seq(("a", 1)),
|
||||
Seq(("a", 2), ("b", 1)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 1)),
|
||||
Seq(("a", 1)),
|
||||
Seq(("a", 1)),
|
||||
Seq(("a", 2), ("b", 1)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 1)),
|
||||
Seq(("a", 1))
|
||||
)
|
||||
|
||||
/*
|
||||
The output of the reduceByKeyAndWindow with inverse reduce function is
|
||||
difference from the naive reduceByKeyAndWindow. Even if the count of a
|
||||
particular key is 0, the key does not get eliminated from the RDDs of
|
||||
ReducedWindowedDStream. This causes the number of keys in these RDDs to
|
||||
increase forever. A more generalized version that allows elimination of
|
||||
keys should be considered.
|
||||
*/
|
||||
val bigOutputInv = Seq(
|
||||
Seq(("a", 1)),
|
||||
Seq(("a", 2), ("b", 1)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 1), ("c", 0)),
|
||||
Seq(("a", 1), ("b", 0), ("c", 0)),
|
||||
Seq(("a", 1), ("b", 0), ("c", 0)),
|
||||
Seq(("a", 2), ("b", 1), ("c", 0)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 2), ("c", 1)),
|
||||
Seq(("a", 2), ("b", 1), ("c", 0)),
|
||||
Seq(("a", 1), ("b", 0), ("c", 0))
|
||||
)
|
||||
|
||||
def testReduceByKeyAndWindow(
|
||||
name: String,
|
||||
input: Seq[Seq[(String, Int)]],
|
||||
expectedOutput: Seq[Seq[(String, Int)]],
|
||||
windowTime: Time = Seconds(2),
|
||||
slideTime: Time = Seconds(1)
|
||||
) {
|
||||
test("reduceByKeyAndWindow - " + name) {
|
||||
testOperation(
|
||||
input,
|
||||
(s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist(),
|
||||
expectedOutput,
|
||||
true
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def testReduceByKeyAndWindowInv(
|
||||
name: String,
|
||||
input: Seq[Seq[(String, Int)]],
|
||||
expectedOutput: Seq[Seq[(String, Int)]],
|
||||
windowTime: Time = Seconds(2),
|
||||
slideTime: Time = Seconds(1)
|
||||
) {
|
||||
test("reduceByKeyAndWindowInv - " + name) {
|
||||
testOperation(
|
||||
input,
|
||||
(s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist(),
|
||||
expectedOutput,
|
||||
true
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Testing naive reduceByKeyAndWindow (without invertible function)
|
||||
|
||||
testReduceByKeyAndWindow(
|
||||
"basic reduction",
|
||||
Seq(Seq(("a", 1), ("a", 3)) ),
|
||||
Seq(Seq(("a", 4)) )
|
||||
)
|
||||
|
||||
testReduceByKeyAndWindow(
|
||||
"key already in window and new value added into window",
|
||||
Seq( Seq(("a", 1)), Seq(("a", 1)) ),
|
||||
Seq( Seq(("a", 1)), Seq(("a", 2)) )
|
||||
)
|
||||
|
||||
|
||||
testReduceByKeyAndWindow(
|
||||
"new key added into window",
|
||||
Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
|
||||
Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
|
||||
)
|
||||
|
||||
testReduceByKeyAndWindow(
|
||||
"key removed from window",
|
||||
Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
|
||||
Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq() )
|
||||
)
|
||||
|
||||
testReduceByKeyAndWindow(
|
||||
"larger slide time",
|
||||
largerSlideInput,
|
||||
largerSlideOutput,
|
||||
Seconds(4),
|
||||
Seconds(2)
|
||||
)
|
||||
|
||||
testReduceByKeyAndWindow("big test", bigInput, bigOutput)
|
||||
|
||||
|
||||
// Testing reduceByKeyAndWindow (with invertible reduce function)
|
||||
|
||||
testReduceByKeyAndWindowInv(
|
||||
"basic reduction",
|
||||
Seq(Seq(("a", 1), ("a", 3)) ),
|
||||
Seq(Seq(("a", 4)) )
|
||||
)
|
||||
|
||||
testReduceByKeyAndWindowInv(
|
||||
"key already in window and new value added into window",
|
||||
Seq( Seq(("a", 1)), Seq(("a", 1)) ),
|
||||
Seq( Seq(("a", 1)), Seq(("a", 2)) )
|
||||
)
|
||||
|
||||
testReduceByKeyAndWindowInv(
|
||||
"new key added into window",
|
||||
Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
|
||||
Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
|
||||
)
|
||||
|
||||
testReduceByKeyAndWindowInv(
|
||||
"key removed from window",
|
||||
Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
|
||||
Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
|
||||
)
|
||||
|
||||
testReduceByKeyAndWindowInv(
|
||||
"larger slide time",
|
||||
largerSlideInput,
|
||||
largerSlideOutput,
|
||||
Seconds(4),
|
||||
Seconds(2)
|
||||
)
|
||||
|
||||
testReduceByKeyAndWindowInv("big test", bigInput, bigOutputInv)
|
||||
}
|
Loading…
Reference in a new issue