Changed scheduler and file input stream to fix bugs in the driver fault tolerance. Added MasterFailureTest to rigorously test master fault tolerance with file input stream.

This commit is contained in:
Tathagata Das 2013-02-13 12:17:45 -08:00
parent fd90daf850
commit 39addd3803
18 changed files with 693 additions and 452 deletions

View file

@ -292,7 +292,7 @@ abstract class DStream[T: ClassManifest] (
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* (eg. ForEachDStream).
* to generate their own jobs.
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@ -308,19 +308,18 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Dereference RDDs that are older than rememberDuration.
* Clear metadata that are older than `rememberDuration` of this DStream.
* This is an internal method that should not be called directly. This default
* implementation clears the old generated RDDs. Subclasses of DStream may override
* this to clear their own metadata along with the generated RDDs.
*/
protected[streaming] def forgetOldMetadata(time: Time) {
protected[streaming] def clearOldMetadata(time: Time) {
var numForgotten = 0
generatedRDDs.keys.foreach(t => {
if (t <= (time - rememberDuration)) {
generatedRDDs.remove(t)
numForgotten += 1
logInfo("Forgot RDD of time " + t + " from " + this)
}
})
logInfo("Forgot " + numForgotten + " RDDs from " + this)
dependencies.foreach(_.forgetOldMetadata(time))
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
generatedRDDs --= oldRDDs.keys
logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
dependencies.foreach(_.clearOldMetadata(time))
}
/* Adds metadata to the Stream while it is running.

View file

@ -87,7 +87,7 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
}
override def toString() = {
"[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]"
"[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]"
}
}

View file

@ -11,17 +11,20 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
private[streaming] var zeroTime: Time = null
private[streaming] var batchDuration: Duration = null
private[streaming] var rememberDuration: Duration = null
private[streaming] var checkpointInProgress = false
var rememberDuration: Duration = null
var checkpointInProgress = false
private[streaming] def start(time: Time) {
var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
def start(time: Time) {
this.synchronized {
if (zeroTime != null) {
throw new Exception("DStream graph computation already started")
}
zeroTime = time
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
@ -29,19 +32,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}
private[streaming] def stop() {
def restart(time: Time) {
this.synchronized { startTime = time }
}
def stop() {
this.synchronized {
inputStreams.par.foreach(_.stop())
}
}
private[streaming] def setContext(ssc: StreamingContext) {
def setContext(ssc: StreamingContext) {
this.synchronized {
outputStreams.foreach(_.setContext(ssc))
}
}
private[streaming] def setBatchDuration(duration: Duration) {
def setBatchDuration(duration: Duration) {
this.synchronized {
if (batchDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
@ -51,61 +58,61 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
private[streaming] def remember(duration: Duration) {
def remember(duration: Duration) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
". cannot set it again.")
}
rememberDuration = duration
}
rememberDuration = duration
}
private[streaming] def addInputStream(inputStream: InputDStream[_]) {
def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}
private[streaming] def addOutputStream(outputStream: DStream[_]) {
def addOutputStream(outputStream: DStream[_]) {
this.synchronized {
outputStream.setGraph(this)
outputStreams += outputStream
}
}
private[streaming] def getInputStreams() = this.synchronized { inputStreams.toArray }
def getInputStreams() = this.synchronized { inputStreams.toArray }
private[streaming] def getOutputStreams() = this.synchronized { outputStreams.toArray }
def getOutputStreams() = this.synchronized { outputStreams.toArray }
private[streaming] def generateRDDs(time: Time): Seq[Job] = {
def generateRDDs(time: Time): Seq[Job] = {
this.synchronized {
logInfo("Generating RDDs for time " + time)
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
}
private[streaming] def forgetOldRDDs(time: Time) {
def clearOldMetadata(time: Time) {
this.synchronized {
logInfo("Forgetting old RDDs for time " + time)
outputStreams.foreach(_.forgetOldMetadata(time))
logInfo("Clearing old metadata for time " + time)
outputStreams.foreach(_.clearOldMetadata(time))
}
}
private[streaming] def updateCheckpointData(time: Time) {
def updateCheckpointData(time: Time) {
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
}
private[streaming] def restoreCheckpointData() {
def restoreCheckpointData() {
this.synchronized {
outputStreams.foreach(_.restoreCheckpointData())
}
}
private[streaming] def validate() {
def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
//assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")

View file

@ -38,13 +38,19 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
logInfo("Added " + job + " to queue")
}
def stop() {
jobExecutor.shutdown()
}
private def clearJob(job: Job) {
jobs.synchronized {
val jobsOfTime = jobs.get(job.time)
val time = job.time
val jobsOfTime = jobs.get(time)
if (jobsOfTime.isDefined) {
jobsOfTime.get -= job
if (jobsOfTime.get.isEmpty) {
jobs -= job.time
ssc.scheduler.clearOldMetadata(time)
jobs -= time
}
} else {
throw new Exception("Job finished for time " + job.time +

View file

@ -9,11 +9,8 @@ class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
val graph = ssc.graph
val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir)
} else {
@ -24,53 +21,80 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => generateRDDs(new Time(longTime)))
val graph = ssc.graph
def start() {
// If context was started from checkpoint, then restart timer such that
// this timer's triggers occur at the same time as the original timer.
// Otherwise just start the timer from scratch, and initialize graph based
// on this first trigger time of the timer.
def start() = synchronized {
if (ssc.isCheckpointPresent) {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
// Reschedule the batches that were received but not processed before failure
//ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time))
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
println(pendingTimes.mkString(", "))
pendingTimes.foreach(time =>
graph.generateRDDs(time).foreach(jobManager.runJob))
// Restart the timer
timer.restart(graph.zeroTime.milliseconds)
logInfo("Scheduler's timer restarted")
restart()
} else {
val firstTime = new Time(timer.start())
graph.start(firstTime - ssc.graph.batchDuration)
logInfo("Scheduler's timer started")
startFirstTime()
}
logInfo("Scheduler started")
}
def stop() {
def stop() = synchronized {
timer.stop()
graph.stop()
jobManager.stop()
ssc.graph.stop()
logInfo("Scheduler stopped")
}
private def generateRDDs(time: Time) {
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Scheduler's timer started at " + startTime)
}
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
val batchDuration = ssc.graph.batchDuration
// Batches when the master was down, that is,
// between the checkpoint and current restart time
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
logInfo("Batches during down time: " + downTimes.mkString(", "))
// Batches that were unprocessed before failure
val pendingTimes = ssc.initialCheckpoint.pendingTimes
logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
graph.generateRDDs(time).foreach(jobManager.runJob)
)
// Restart the timer
timer.start(restartTime.milliseconds)
logInfo("Scheduler's timer restarted")
}
/** Generates the RDDs, clears old metadata and does checkpoint for the given time */
def generateRDDs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
graph.generateRDDs(time).foreach(jobManager.runJob)
graph.forgetOldRDDs(time)
doCheckpoint(time)
}
private def doCheckpoint(time: Time) {
def clearOldMetadata(time: Time) {
ssc.graph.clearOldMetadata(time)
}
def doCheckpoint(time: Time) {
if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
val startTime = System.currentTimeMillis()

View file

@ -37,6 +37,16 @@ case class Time(private val millis: Long) {
def max(that: Time): Time = if (this > that) this else that
def until(that: Time, interval: Duration): Seq[Time] = {
assert(that > this, "Cannot create sequence as " + that + " not more than " + this)
assert(
(that - this).isMultipleOf(interval),
"Cannot create sequence as gap between " + that + " and " +
this + " is not multiple of " + interval
)
(this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
}
override def toString: String = (millis.toString + " ms")
}

View file

@ -21,19 +21,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// Latest file mod time seen till any point of time
private val lastModTimeFiles = new HashSet[String]()
private var lastModTime = 0L
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
@transient private var files = new HashMap[Time, Array[String]]
@transient private[streaming] var files = new HashMap[Time, Array[String]]
override def start() {
if (newFilesOnly) {
lastModTime = System.currentTimeMillis()
lastModTime = graph.zeroTime.milliseconds
} else {
lastModTime = 0
}
logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
}
override def stop() { }
@ -43,38 +45,50 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
* granularity of seconds. Hence, new files may have the same modification time as the
* latest modification time in the previous call to this method and the list of files
* maintained is used to filter the one that have been processed.
* granularity of seconds. And new files may have the same modification time as the
* latest modification time in the previous call to this method yet was not reported in
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
// Create the filter for selecting new files
val newFilter = new PathFilter() {
// Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
if (!filter(path)) {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
} else {
} else { // Accept file only if
val modTime = fs.getFileStatus(path).getModificationTime()
if (modTime < lastModTime){
return false
logDebug("Mod time for " + path + " is " + modTime)
if (modTime < lastModTime) {
logDebug("Mod time less than last mod time")
return false // If the file was created before the last time it was called
} else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
return false
logDebug("Mod time equal to last mod time, but file considered already")
return false // If the file was created exactly as lastModTime but not reported yet
} else if (modTime > validTime.milliseconds) {
logDebug("Mod time more than valid time")
return false // If the file was created after the time this function call requires
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
logDebug("Latest mod time updated to " + latestModTime)
}
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
return true
}
}
}
logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
logInfo("New files: " + newFiles.mkString(", "))
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
if (lastModTime != newFilter.latestModTime) {
@ -82,17 +96,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
lastModTimeFiles.clear()
}
lastModTimeFiles ++= newFilter.latestModTimeFiles
logDebug("Last mod time updated to " + lastModTime)
}
files += ((validTime, newFiles))
Some(filesToRDD(newFiles))
}
/** Forget the old time-to-files mappings along with old RDDs */
protected[streaming] override def forgetOldMetadata(time: Time) {
super.forgetOldMetadata(time)
val filesToBeRemoved = files.filter(_._1 <= (time - rememberDuration))
files --= filesToBeRemoved.keys
logInfo("Forgot " + filesToBeRemoved.size + " files from " + this)
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearOldMetadata(time: Time) {
super.clearOldMetadata(time)
val oldFiles = files.filter(_._1 <= (time - rememberDuration))
files --= oldFiles.keys
logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
/** Generate one RDD from an array of files */
@ -148,6 +166,11 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
}
}
override def toString() = {
"[\n" + hadoopFiles.size + " file sets\n" +
hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
}
}
}

View file

@ -46,8 +46,15 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
def stop() {}
override def compute(validTime: Time): Option[RDD[T]] = {
val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
Some(new BlockRDD[T](ssc.sc, blockIds))
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure forces
if (validTime >= graph.startTime) {
val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[String]()))
}
}
}

View file

@ -0,0 +1,375 @@
package spark.streaming.util
import spark.{Logging, RDD}
import spark.streaming._
import spark.streaming.dstream.ForEachDStream
import StreamingContext._
import scala.util.Random
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import java.io.{File, ObjectInputStream, IOException}
import java.util.UUID
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
import org.apache.hadoop.conf.Configuration
private[streaming]
object MasterFailureTest extends Logging {
initLogging()
@volatile var killed = false
@volatile var killCount = 0
def main(args: Array[String]) {
if (args.size < 2) {
println(
"Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
System.exit(1)
}
val directory = args(0)
val numBatches = args(1).toInt
val batchDuration = if (args.size > 2) Milliseconds(args(2).toInt) else Seconds(1)
println("\n\n========================= MAP TEST =========================\n\n")
testMap(directory, numBatches, batchDuration)
println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n")
testUpdateStateByKey(directory, numBatches, batchDuration)
}
def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
// Input: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
val input = (1 to numBatches).map(_.toString).toSeq
// Expected output: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
val expectedOutput = (1 to numBatches)
val operation = (st: DStream[String]) => st.map(_.toInt)
// Run streaming operation with multiple master failures
val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
logInfo("Expected output, size = " + expectedOutput.size)
logInfo(expectedOutput.mkString("[", ",", "]"))
logInfo("Output, size = " + output.size)
logInfo(output.mkString("[", ",", "]"))
// Verify whether all the values of the expected output is present
// in the output
assert(output.distinct.toSet == expectedOutput.toSet)
}
def testUpdateStateByKey(directory: String, numBatches: Int, batchDuration: Duration) {
// Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
// Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j))
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Long], state: Option[Long]) => {
Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
}
st.flatMap(_.split(" "))
.map(x => (x, 1L))
.updateStateByKey[Long](updateFunc)
.checkpoint(batchDuration * 5)
}
// Run streaming operation with multiple master failures
val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
logInfo("Expected output, size = " + expectedOutput.size + "\n" + expectedOutput)
logInfo("Output, size = " + output.size + "\n" + output)
// Verify whether all the values in the output are among the expected output values
output.foreach(o =>
assert(expectedOutput.contains(o), "Expected value " + o + " not found")
)
// Verify whether the last expected output value has been generated, there by
// confirming that none of the inputs have been missed
assert(output.last == expectedOutput.last)
}
/**
* Tests stream operation with multiple master failures, and verifies whether the
* final set of output values is as expected or not.
*/
def testOperation[T: ClassManifest](
directory: String,
batchDuration: Duration,
input: Seq[String],
operation: DStream[String] => DStream[T],
expectedOutput: Seq[T]
): Seq[T] = {
// Just making sure that the expected output does not have duplicates
assert(expectedOutput.distinct.toSet == expectedOutput.toSet)
// Setup the stream computation with the given operation
val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation)
// Start generating files in the a different thread
val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds)
fileGeneratingThread.start()
// Run the streams and repeatedly kill it until the last expected output
// has been generated, or until it has run for twice the expected time
val lastExpectedOutput = expectedOutput.last
val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
// Delete directories
fileGeneratingThread.join()
val fs = checkpointDir.getFileSystem(new Configuration())
fs.delete(checkpointDir, true)
fs.delete(testDir, true)
logInfo("Finished test after " + killCount + " failures")
mergedOutput
}
/**
* Sets up the stream computation with the given operation, directory (local or HDFS),
* and batch duration. Returns the streaming context and the directory to which
* files should be written for testing.
*/
private def setupStreams[T: ClassManifest](
directory: String,
batchDuration: Duration,
operation: DStream[String] => DStream[T]
): (StreamingContext, Path, Path) = {
// Reset all state
reset()
// Create the directories for this test
val uuid = UUID.randomUUID().toString
val rootDir = new Path(directory, uuid)
val fs = rootDir.getFileSystem(new Configuration())
val checkpointDir = new Path(rootDir, "checkpoint")
val testDir = new Path(rootDir, "test")
fs.mkdirs(checkpointDir)
fs.mkdirs(testDir)
// Setup the streaming computation with the given operation
System.clearProperty("spark.driver.port")
var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration)
ssc.checkpoint(checkpointDir.toString)
val inputStream = ssc.textFileStream(testDir.toString)
val operatedStream = operation(inputStream)
val outputStream = new TestOutputStream(operatedStream)
ssc.registerOutputStream(outputStream)
(ssc, checkpointDir, testDir)
}
/**
* Repeatedly starts and kills the streaming context until timed out or
* the last expected output is generated. Finally, return
*/
private def runStreams[T: ClassManifest](
ssc_ : StreamingContext,
lastExpectedOutput: T,
maxTimeToRun: Long
): Seq[T] = {
var ssc = ssc_
var totalTimeRan = 0L
var isLastOutputGenerated = false
var isTimedOut = false
val mergedOutput = new ArrayBuffer[T]()
val checkpointDir = ssc.checkpointDir
var batchDuration = ssc.graph.batchDuration
while(!isLastOutputGenerated && !isTimedOut) {
// Get the output buffer
val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output
def output = outputBuffer.flatMap(x => x)
// Start the thread to kill the streaming after some time
killed = false
val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
killingThread.start()
var timeRan = 0L
try {
// Start the streaming computation and let it run while ...
// (i) StreamingContext has not been shut down yet
// (ii) The last expected output has not been generated yet
// (iii) Its not timed out yet
System.clearProperty("spark.streaming.clock")
System.clearProperty("spark.driver.port")
ssc.start()
val startTime = System.currentTimeMillis()
while (!killed && !isLastOutputGenerated && !isTimedOut) {
Thread.sleep(100)
timeRan = System.currentTimeMillis() - startTime
isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput)
isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
}
} catch {
case e: Exception => logError("Error running streaming context", e)
}
if (killingThread.isAlive) killingThread.interrupt()
ssc.stop()
logInfo("Has been killed = " + killed)
logInfo("Is last output generated = " + isLastOutputGenerated)
logInfo("Is timed out = " + isTimedOut)
// Verify whether the output of each batch has only one element or no element
// and then merge the new output with all the earlier output
mergedOutput ++= output
totalTimeRan += timeRan
logInfo("New output = " + output)
logInfo("Merged output = " + mergedOutput)
logInfo("Time ran = " + timeRan)
logInfo("Total time ran = " + totalTimeRan)
if (!isLastOutputGenerated && !isTimedOut) {
val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10)
logInfo(
"\n-------------------------------------------\n" +
" Restarting stream computation in " + sleepTime + " ms " +
"\n-------------------------------------------\n"
)
Thread.sleep(sleepTime)
// Recreate the streaming context from checkpoint
ssc = new StreamingContext(checkpointDir)
}
}
mergedOutput
}
/**
* Verifies the output value are the same as expected. Since failures can lead to
* a batch being processed twice, a batches output may appear more than once
* consecutively. To avoid getting confused with those, we eliminate consecutive
* duplicate batch outputs of values from the `output`. As a result, the
* expected output should not have consecutive batches with the same values as output.
*/
private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) {
// Verify whether expected outputs do not consecutive batches with same output
for (i <- 0 until expectedOutput.size - 1) {
assert(expectedOutput(i) != expectedOutput(i+1),
"Expected output has consecutive duplicate sequence of values")
}
// Log the output
println("Expected output, size = " + expectedOutput.size)
println(expectedOutput.mkString("[", ",", "]"))
println("Output, size = " + output.size)
println(output.mkString("[", ",", "]"))
// Match the output with the expected output
output.foreach(o =>
assert(expectedOutput.contains(o), "Expected value " + o + " not found")
)
}
/** Resets counter to prepare for the test */
private def reset() {
killed = false
killCount = 0
}
}
/**
* This is a output stream just for testing. All the output is collected into a
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
private[streaming]
class TestOutputStream[T: ClassManifest](
parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
) extends ForEachDStream[T](
parent,
(rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
println(t + ": " + collected.mkString("[", ",", "]"))
}
) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
ois.defaultReadObject()
output.clear()
}
}
/**
* Thread to kill streaming context after a random period of time.
*/
private[streaming]
class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
initLogging()
override def run() {
try {
// If it is the first killing, then allow the first checkpoint to be created
var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 1000
val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
logInfo("Kill wait time = " + killWaitTime)
Thread.sleep(killWaitTime)
logInfo(
"\n---------------------------------------\n" +
"Killing streaming context after " + killWaitTime + " ms" +
"\n---------------------------------------\n"
)
if (ssc != null) {
ssc.stop()
MasterFailureTest.killed = true
MasterFailureTest.killCount += 1
}
logInfo("Killing thread finished normally")
} catch {
case ie: InterruptedException => logInfo("Killing thread interrupted")
case e: Exception => logWarning("Exception in killing thread", e)
}
}
}
/**
* Thread to generate input files periodically with the desired text.
*/
private[streaming]
class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
extends Thread with Logging {
initLogging()
override def run() {
val localTestDir = Files.createTempDir()
val fs = testDir.getFileSystem(new Configuration())
try {
Thread.sleep(5000) // To make sure that all the streaming context has been set up
for (i <- 0 until input.size) {
// Write the data to a local file and then move it to the target test directory
val localFile = new File(localTestDir, (i+1).toString)
val hadoopFile = new Path(testDir, (i+1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
//fs.moveFromLocalFile(new Path(localFile.toString), new Path(testDir, i.toString))
fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
Thread.sleep(interval)
localFile.delete()
}
logInfo("File generating thread finished normally")
} catch {
case ie: InterruptedException => logInfo("File generating thread interrupted")
case e: Exception => logWarning("File generating in killing thread", e)
} finally {
fs.close()
}
}
}

View file

@ -3,9 +3,9 @@ package spark.streaming.util
private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
val minPollTime = 25L
private val minPollTime = 25L
val pollTime = {
private val pollTime = {
if (period / 10.0 > minPollTime) {
(period / 10.0).toLong
} else {
@ -13,11 +13,20 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
}
val thread = new Thread() {
private val thread = new Thread() {
override def run() { loop }
}
var nextTime = 0L
private var nextTime = 0L
def getStartTime(): Long = {
(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
}
def getRestartTime(originalStartTime: Long): Long = {
val gap = clock.currentTime - originalStartTime
(math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
}
def start(startTime: Long): Long = {
nextTime = startTime
@ -26,21 +35,14 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
def start(): Long = {
val startTime = (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
start(startTime)
start(getStartTime())
}
def restart(originalStartTime: Long): Long = {
val gap = clock.currentTime - originalStartTime
val newStartTime = (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
start(newStartTime)
}
def stop() {
def stop() {
thread.interrupt()
}
def loop() {
private def loop() {
try {
while (true) {
clock.waitTillTime(nextTime)

View file

@ -33,7 +33,8 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint", new Duration(1000));
}
@ -45,7 +46,7 @@ public class JavaAPISuite implements Serializable {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}
/*
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@ -434,7 +435,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result);
}
*/
/*
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.
@ -450,7 +451,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, actual);
}
/*
// PairDStream Functions
@Test
public void testPairFilter() {
@ -897,7 +898,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
*/
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@ -964,7 +965,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result1);
}
*/
/*
// Input stream tests. These mostly just test that we can instantiate a given InputStream with
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.
@ -972,9 +973,9 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
@ -1026,5 +1027,5 @@ public class JavaAPISuite implements Serializable {
public void testFileStream() {
JavaPairDStream<String, String> foo =
ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
}*/
}
}

View file

@ -1,6 +1,7 @@
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.rootCategory=WARN, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.ConsoleAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
@ -8,4 +9,6 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}:
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.spark.streaming=INFO
log4j.logger.spark.streaming.dstream.FileInputDStream=DEBUG

View file

@ -6,6 +6,8 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
override def framework() = "BasicOperationsSuite"
after {

View file

@ -1,5 +1,6 @@
package spark.streaming
import dstream.FileInputDStream
import spark.streaming.StreamingContext._
import java.io.File
import runtime.RichInt
@ -10,8 +11,16 @@ import util.{Clock, ManualClock}
import scala.util.Random
import com.google.common.io.Files
/**
* This test suites tests the checkpointing functionality of DStreams -
* the checkpointing of a DStream's RDDs as well as the checkpointing of
* the whole DStream graph.
*/
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
before {
FileUtils.deleteDirectory(new File(checkpointDir))
}
@ -64,7 +73,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a time such that at least one RDD in the stream should have been checkpointed,
// then check whether some RDD has been checkpointed or not
ssc.start()
runStreamsWithRealDelay(ssc, firstNumBatches)
advanceTimeWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.checkpointFiles.foreach {
@ -77,7 +86,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
runStreamsWithRealDelay(ssc, secondNumBatches)
advanceTimeWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@ -92,7 +101,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
runStreamsWithRealDelay(ssc, 1)
advanceTimeWithRealDelay(ssc, 1)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
@ -113,7 +122,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Adjust manual clock time as if it is being restarted after a delay
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
runStreamsWithRealDelay(ssc, 4)
advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
@ -168,74 +177,95 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
// This tests whether file input stream remembers what files were seen before
// the master failure and uses them again to process a large window operatoin.
// the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
val clockProperty = System.getProperty("spark.streaming.clock")
System.clearProperty("spark.streaming.clock")
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
var ssc = new StreamingContext(master, framework, batchDuration)
var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir, checkpointInterval)
val fileStream = ssc.textFileStream(testDir.toString)
// Making value 3 take large time to process, to ensure that the master
// shuts down in the middle of processing the 3rd batch
val mappedStream = fileStream.map(s => {
val i = s.toInt
if (i == 3) Thread.sleep(1000)
if (i == 3) Thread.sleep(2000)
i
})
// Reducing over a large window to ensure that recovery from master failure
// requires reprocessing of all the files seen before the failure
val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
val outputBuffer = new ArrayBuffer[Seq[Int]]
var outputStream = new TestOutputStream(reducedStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files and advance manual clock to process them
var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
//var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
// wait to make sure that the file is written such that it gets shown in the file listings
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
// wait to make sure that FileInputDStream picks up this file only and not any other file
Thread.sleep(500)
Thread.sleep(1000)
}
logInfo("Output = " + outputStream.output.mkString(","))
assert(outputStream.output.size > 0, "No files processed before restart")
ssc.stop()
// Verify whether files created have been recorded correctly or not
var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
def recordedFiles = fileInputDStream.files.values.flatMap(x => x)
assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(1000)
}
// Restart stream computation from checkpoint and create more files to see whether
// they are being processed
// Recover context from checkpoint file and verify whether the files that were
// recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
// Restart stream computation
ssc.start()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(7, 8, 9)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
Thread.sleep(500)
Thread.sleep(1000)
}
Thread.sleep(1000)
logInfo("Output = " + outputStream.output.mkString(","))
logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
// Verify whether files created while the driver was down have been recorded or not
assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
// Verify whether new files created after recover have been recorded or not
assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
// Append the new output to the old buffer
outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
outputBuffer ++= outputStream.output
// Verify whether data received by Spark Streaming was as expected
val expectedOutput = Seq(1, 3, 6, 28, 36, 45)
val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
logInfo("--------------------------------")
logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@ -244,11 +274,17 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
assert(outputBuffer.size === expectedOutput.size)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
assert(outputBuffer(i).head === expectedOutput(i))
}
val output = outputBuffer.flatMap(x => x)
assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
output.foreach(o => // To ensure all the inputs are correctly added cumulatively
assert(expectedOutput.contains(o), "Expected value " + o + " not found")
)
// To ensure that all the inputs were received correctly
assert(expectedOutput.last === output.last)
// Enable manual clock back again for other tests
if (clockProperty != null)
System.setProperty("spark.streaming.clock", clockProperty)
}
@ -278,7 +314,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Do the computation for initial number of batches, create checkpoint file and quit
ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
ssc.start()
val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches)
ssc.stop()
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@ -289,17 +327,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
"\n-------------------------------------------\n"
)
ssc = new StreamingContext(checkpointDir)
val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
System.clearProperty("spark.driver.port")
ssc.start()
val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
// the first element will be re-processed data of the last batch before restart
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
ssc.stop()
ssc = null
}
/**
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also wait for the expected amount of time for each batch.
* It also waits for the expected amount of time for each batch.
*/
def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
@ -308,6 +349,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
}
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
outputStream.output
}
}

View file

@ -1,14 +1,15 @@
package spark.streaming
import org.scalatest.{FunSuite, BeforeAndAfter}
import org.apache.commons.io.FileUtils
import java.io.File
import scala.runtime.RichInt
import scala.util.Random
import spark.streaming.StreamingContext._
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import spark.Logging
import spark.streaming.util.MasterFailureTest
import StreamingContext._
import org.scalatest.{FunSuite, BeforeAndAfter}
import com.google.common.io.Files
import java.io.File
import org.apache.commons.io.FileUtils
import collection.mutable.ArrayBuffer
/**
* This testsuite tests master failures at random times while the stream is running using
@ -16,295 +17,24 @@ import com.google.common.io.Files
*/
class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
var testDir: File = null
var checkpointDir: File = null
val batchDuration = Milliseconds(500)
var directory = "FailureSuite"
val numBatches = 30
val batchDuration = Milliseconds(1000)
before {
testDir = Files.createTempDir()
checkpointDir = Files.createTempDir()
FileUtils.deleteDirectory(new File(directory))
}
after {
FailureSuite.reset()
FileUtils.deleteDirectory(checkpointDir)
FileUtils.deleteDirectory(testDir)
FileUtils.deleteDirectory(new File(directory))
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
test("multiple failures with map") {
MasterFailureTest.testMap(directory, numBatches, batchDuration)
}
test("multiple failures with updateStateByKey") {
val n = 30
// Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
// Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
}
st.flatMap(_.split(" "))
.map(x => (x, 1))
.updateStateByKey[RichInt](updateFunc)
.checkpoint(Seconds(2))
.map(t => (t._1, t._2.self))
}
testOperationWithMultipleFailures(input, operation, expectedOutput)
}
test("multiple failures with reduceByKeyAndWindow") {
val n = 30
val w = 100
assert(w > n, "Window should be much larger than the number of input sets in this test")
// Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
// Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
val operation = (st: DStream[String]) => {
st.flatMap(_.split(" "))
.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
.checkpoint(Seconds(2))
}
testOperationWithMultipleFailures(input, operation, expectedOutput)
}
/**
* Tests stream operation with multiple master failures, and verifies whether the
* final set of output values is as expected or not. Checking the final value is
* proof that no intermediate data was lost due to master failures.
*/
def testOperationWithMultipleFailures(
input: Seq[String],
operation: DStream[String] => DStream[(String, Int)],
expectedOutput: Seq[(String, Int)]
) {
var ssc = setupStreamsWithFileStream(operation)
val mergedOutput = new ArrayBuffer[(String, Int)]()
val lastExpectedOutput = expectedOutput.last
val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
var totalTimeRan = 0L
// Start generating files in the a different thread
val fileGeneratingThread = new FileGeneratingThread(input, testDir.getPath, batchDuration.milliseconds)
fileGeneratingThread.start()
// Repeatedly start and kill the streaming context until timed out or
// all expected output is generated
while(!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
// Start the thread to kill the streaming after some time
FailureSuite.failed = false
val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
killingThread.start()
// Run the streams with real clock until last expected output is seen or timed out
val (output, timeRan) = runStreamsWithRealClock(ssc, lastExpectedOutput, maxTimeToRun - totalTimeRan)
if (killingThread.isAlive) killingThread.interrupt()
// Merge output and time ran and see whether already timed out or not
mergedOutput ++= output
totalTimeRan += timeRan
logInfo("New output = " + output)
logInfo("Merged output = " + mergedOutput)
logInfo("Total time spent = " + totalTimeRan)
if (totalTimeRan > maxTimeToRun) {
FailureSuite.timedOut = true
}
if (!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 2)
logInfo(
"\n-------------------------------------------\n" +
" Restarting stream computation in " + sleepTime + " ms " +
"\n-------------------------------------------\n"
)
Thread.sleep(sleepTime)
}
// Recreate the streaming context from checkpoint
ssc = new StreamingContext(checkpointDir.getPath)
}
ssc.stop()
ssc = null
logInfo("Finished test after " + FailureSuite.failureCount + " failures")
if (FailureSuite.timedOut) {
logWarning("Timed out with run time of "+ maxTimeToRun + " ms for " +
expectedOutput.size + " batches of " + batchDuration)
}
// Verify whether the output is as expected
verifyOutput(mergedOutput, expectedOutput)
if (fileGeneratingThread.isAlive) fileGeneratingThread.interrupt()
}
/** Sets up the stream operations with file input stream */
def setupStreamsWithFileStream(
operation: DStream[String] => DStream[(String, Int)]
): StreamingContext = {
val ssc = new StreamingContext("local[4]", "FailureSuite", batchDuration)
ssc.checkpoint(checkpointDir.getPath)
val inputStream = ssc.textFileStream(testDir.getPath)
val operatedStream = operation(inputStream)
val outputBuffer = new ArrayBuffer[Seq[(String, Int)]] with SynchronizedBuffer[Seq[(String, Int)]]
val outputStream = new TestOutputStream(operatedStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc
}
/**
* Runs the streams set up in `ssc` on real clock.
*/
def runStreamsWithRealClock(
ssc: StreamingContext,
lastExpectedOutput: (String, Int),
timeout: Long
): (Seq[(String, Int)], Long) = {
System.clearProperty("spark.streaming.clock")
// Get the output buffer
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[(String, Int)]]
val output = outputStream.output
val startTime = System.currentTimeMillis()
// Functions to detect various conditions
def hasFailed = FailureSuite.failed
def isLastOutputGenerated = !output.flatMap(x => x).isEmpty && output(output.lastIndexWhere(!_.isEmpty)).head == lastExpectedOutput
def isTimedOut = System.currentTimeMillis() - startTime > timeout
// Start the streaming computation and let it run while ...
// (i) StreamingContext has not been shut down yet
// (ii) The last expected output has not been generated yet
// (iii) Its not timed out yet
try {
ssc.start()
while (!hasFailed && !isLastOutputGenerated && !isTimedOut) {
Thread.sleep(100)
}
logInfo("Has failed = " + hasFailed)
logInfo("Is last output generated = " + isLastOutputGenerated)
logInfo("Is timed out = " + isTimedOut)
} catch {
case e: Exception => logInfo("Exception while running streams: " + e)
} finally {
ssc.stop()
}
// Verify whether the output of each batch has only one element
assert(output.forall(_.size <= 1), "output of each batch should have only one element")
// Set appropriate flags is timed out or output has been generated
if (isTimedOut) FailureSuite.timedOut = true
if (isLastOutputGenerated) FailureSuite.outputGenerated = true
val timeTaken = System.currentTimeMillis() - startTime
logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
(output.flatMap(_.headOption), timeTaken)
}
/**
* Verifies the output value are the same as expected. Since failures can lead to
* a batch being processed twice, a batches output may appear more than once
* consecutively. To avoid getting confused with those, we eliminate consecutive
* duplicate batch outputs of values from the `output`. As a result, the
* expected output should not have consecutive batches with the same values as output.
*/
def verifyOutput(output: Seq[(String, Int)], expectedOutput: Seq[(String, Int)]) {
// Verify whether expected outputs do not consecutive batches with same output
for (i <- 0 until expectedOutput.size - 1) {
assert(expectedOutput(i) != expectedOutput(i+1),
"Expected output has consecutive duplicate sequence of values")
}
// Match the output with the expected output
logInfo(
"\n-------------------------------------------\n" +
" Verifying output " +
"\n-------------------------------------------\n"
)
logInfo("Expected output, size = " + expectedOutput.size)
logInfo(expectedOutput.mkString("[", ",", "]"))
logInfo("Output, size = " + output.size)
logInfo(output.mkString("[", ",", "]"))
output.foreach(o =>
assert(expectedOutput.contains(o), "Expected value " + o + " not found")
)
}
}
object FailureSuite {
var failed = false
var outputGenerated = false
var timedOut = false
var failureCount = 0
def reset() {
failed = false
outputGenerated = false
timedOut = false
failureCount = 0
}
}
/**
* Thread to kill streaming context after some time.
*/
class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
initLogging()
override def run() {
try {
var minKillWaitTime = if (FailureSuite.failureCount == 0) 5000 else 1000 // to allow the first checkpoint
val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
logInfo("Kill wait time = " + killWaitTime)
Thread.sleep(killWaitTime)
logInfo(
"\n---------------------------------------\n" +
"Killing streaming context after " + killWaitTime + " ms" +
"\n---------------------------------------\n"
)
if (ssc != null) {
ssc.stop()
FailureSuite.failed = true
FailureSuite.failureCount += 1
}
logInfo("Killing thread exited")
} catch {
case ie: InterruptedException => logInfo("Killing thread interrupted")
case e: Exception => logWarning("Exception in killing thread", e)
}
}
}
/**
* Thread to generate input files periodically with the desired text
*/
class FileGeneratingThread(input: Seq[String], testDir: String, interval: Long)
extends Thread with Logging {
initLogging()
override def run() {
try {
Thread.sleep(5000) // To make sure that all the streaming context has been set up
for (i <- 0 until input.size) {
FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
Thread.sleep(interval)
}
logInfo("File generating thread exited")
} catch {
case ie: InterruptedException => logInfo("File generating thread interrupted")
case e: Exception => logWarning("File generating in killing thread", e)
}
MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration)
}
}

View file

@ -133,26 +133,29 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
System.clearProperty("spark.streaming.clock")
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
val filestream = ssc.textFileStream(testDir.toString)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
val outputStream = new TestOutputStream(filestream, outputBuffer)
val outputStream = new TestOutputStream(fileStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
//Thread.sleep(100)
val file = new File(testDir, i.toString)
FileUtils.writeStringToFile(file, input(i).toString + "\n")
logInfo("Created file " + file)
Thread.sleep(batchDuration.milliseconds)
Thread.sleep(1000)
}
val startTime = System.currentTimeMillis()
Thread.sleep(1000)
@ -171,16 +174,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
assert(output(i).size === 1)
assert(output(i).head.toString === expectedOutput(i))
}
assert(output.toList === expectedOutput.toList)
FileUtils.deleteDirectory(testDir)
// Enable manual clock back again for other tests
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
}
}
/** This is server to test the network input stream */
class TestServer(port: Int) extends Logging {
val queue = new ArrayBlockingQueue[String](100)

View file

@ -63,20 +63,28 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
*/
trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Name of the framework for Spark context
def framework = "TestSuiteBase"
// Master for Spark context
def master = "local[2]"
// Batch duration
def batchDuration = Seconds(1)
// Directory where the checkpoint data will be saved
def checkpointDir = "checkpoint"
// Duration after which the graph is checkpointed
def checkpointInterval = batchDuration
// Number of partitions of the input parallel collections created for testing
def numInputPartitions = 2
// Maximum time to wait before the test times out
def maxWaitTimeMillis = 10000
// Whether to actually wait in real time before changing manual clock
def actuallyWait = false
/**
@ -140,9 +148,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
@ -186,7 +191,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
} finally {
ssc.stop()
}
output
}

View file

@ -5,6 +5,8 @@ import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
override def framework = "WindowOperationsSuite"
override def maxWaitTimeMillis = 20000