Fixed bugs in streaming Scheduler and optimized QueueInputDStream.

This commit is contained in:
Tathagata Das 2012-09-07 20:16:21 +00:00
parent b8e9e8ea78
commit b5750726ff
4 changed files with 9 additions and 5 deletions

View file

@ -94,7 +94,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def getStorageLevel = storageLevel
def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER): RDD[T] = {
def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER_2): RDD[T] = {
if (!level.useDisk && level.replication < 2) {
throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")")
}

View file

@ -25,7 +25,11 @@ class QueueInputDStream[T: ClassManifest](
buffer ++= queue
}
if (buffer.size > 0) {
Some(new UnionRDD(ssc.sc, buffer.toSeq))
if (oneAtATime) {
Some(buffer.first)
} else {
Some(new UnionRDD(ssc.sc, buffer.toSeq))
}
} else if (defaultRDD != null) {
Some(defaultRDD)
} else {
@ -33,4 +37,4 @@ class QueueInputDStream[T: ClassManifest](
}
}
}
}

View file

@ -26,7 +26,6 @@ extends Logging {
val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_))
def start() {
val zeroTime = Time(timer.start())
outputStreams.foreach(_.initialize(zeroTime))
inputStreams.par.foreach(_.start())
@ -41,6 +40,7 @@ extends Logging {
def generateRDDs (time: Time) {
println("\n-----------------------------------------------------\n")
SparkEnv.set(ssc.env)
logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {
outputStream.generateJob(time) match {

View file

@ -7,7 +7,7 @@ import spark.SparkContext._
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
@transient parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean