Updated groupByKeyAndWindow to be computed incrementally, and added mapSideCombine to combineByKeyAndWindow.

This commit is contained in:
Tathagata Das 2013-12-26 02:58:29 -08:00
parent 94479673eb
commit 069cb14bdc
5 changed files with 34 additions and 12 deletions

View file

@ -108,8 +108,9 @@ extends Serializable {
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner) : DStream[(K, C)] = {
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
partitioner: Partitioner,
mapSideCombine: Boolean = true): DStream[(K, C)] = {
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
}
/**
@ -173,7 +174,13 @@ extends Serializable {
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, Seq[V])] = {
self.window(windowDuration, slideDuration).groupByKey(partitioner)
val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
self.groupByKey(partitioner)
.window(windowDuration, slideDuration)
.combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
.asInstanceOf[DStream[(K, Seq[V])]]
}
/**

View file

@ -155,7 +155,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
* combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
* information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
@ -168,6 +168,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
* information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner,
mapSideCombine: Boolean
): JavaPairDStream[K, C] = {
implicit val cm: ClassTag[C] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
}
/**
* Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs

View file

@ -29,8 +29,9 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner
) extends DStream [(K,C)] (parent.ssc) {
partitioner: Partitioner,
mapSideCombine: Boolean = true
) extends DStream[(K,C)] (parent.ssc) {
override def dependencies = List(parent)
@ -38,8 +39,8 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) =>
Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
case Some(rdd) => Some(rdd.combineByKey[C](
createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
}

View file

@ -39,7 +39,7 @@ class WindowedDStream[T: ClassTag](
throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "false").toBoolean
val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean
parent.persist(StorageLevel.MEMORY_ONLY_SER)

View file

@ -225,9 +225,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val slideDuration = Seconds(1)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.groupByKeyAndWindow(windowDuration, slideDuration)
.map(x => (x._1, x._2.toSet))
.persist()
s.groupByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toSet))
}
testOperation(input, operation, expectedOutput, numBatches, true)
}