Added ssc.union

This commit is contained in:
Tathagata Das 2012-12-01 08:26:10 -08:00
parent 6fcd09f499
commit 62965c5d8e
2 changed files with 6 additions and 1 deletions

View file

@ -118,7 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
if (seqOfValues(0).isEmpty) { if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist // If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) { if (newValues.isEmpty) {
throw new Exception("Neither previous window has value for key, nor new values found") val info = "seqOfValues =\n" + seqOfValues.map(x => "[" + x.mkString(",") + "]").mkString("\n")
throw new Exception("Neither previous window has value for key, nor new values found\n" + info)
} }
// Reduce the new values // Reduce the new values
newValues.reduce(reduceF) // return newValues.reduce(reduceF) // return

View file

@ -189,6 +189,10 @@ class StreamingContext private (
inputStream inputStream
} }
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
/** /**
* This function registers a InputDStream as an input stream that will be * This function registers a InputDStream as an input stream that will be
* started (InputDStream.start() called) to get the input data streams. * started (InputDStream.start() called) to get the input data streams.