Fixed bug in FileInputDStream that allowed it to miss new files. Added tests in the InputStreamsSuite to test checkpointing of file and network streams.

This commit is contained in:
Tathagata Das 2012-11-11 13:20:09 -08:00
parent 04e9e9d93c
commit 46222dc56d
2 changed files with 148 additions and 22 deletions

View file

@ -6,7 +6,8 @@ import spark.rdd.UnionRDD
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import java.io.{ObjectInputStream, IOException}
import scala.collection.mutable.HashSet
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@ -19,7 +20,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
var lastModTime: Long = 0
var lastModTime = 0L
val lastModTimeFiles = new HashSet[String]()
def path(): Path = {
if (path_ == null) path_ = new Path(directory)
@ -40,22 +42,37 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
override def stop() { }
/**
* Finds the files that were modified since the last time this method was called and makes
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
* granularity of seconds. Hence, new files may have the same modification time as the
* latest modification time in the previous call to this method and the list of files
* maintained is used to filter the one that have been processed.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
// Create the filter for selecting new files
val newFilter = new PathFilter() {
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
if (!filter.accept(path)) {
return false
} else {
val modTime = fs.getFileStatus(path).getModificationTime()
if (modTime <= lastModTime) {
if (modTime < lastModTime){
return false
} else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
return false
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
}
latestModTimeFiles += path.toString
return true
}
}
@ -64,7 +81,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
val newFiles = fs.listStatus(path, newFilter)
logInfo("New files: " + newFiles.map(_.getPath).mkString(", "))
if (newFiles.length > 0) {
lastModTime = newFilter.latestModTime
// Update the modification time and the files processed for that modification time
if (lastModTime != newFilter.latestModTime) {
lastModTime = newFilter.latestModTime
lastModTimeFiles.clear()
}
lastModTimeFiles ++= newFilter.latestModTimeFiles
}
val newRDD = new UnionRDD(ssc.sc, newFiles.map(
file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString)))

View file

@ -9,12 +9,19 @@ import spark.storage.StorageLevel
import spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
class InputStreamsSuite extends TestSuiteBase {
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
override def checkpointDir = "checkpoint"
after {
FileUtils.deleteDirectory(new File(checkpointDir))
}
test("network input stream") {
// Start the server
val serverPort = 9999
@ -30,7 +37,7 @@ class InputStreamsSuite extends TestSuiteBase {
ssc.registerOutputStream(outputStream)
ssc.start()
// Feed data to the server to send to the Spark Streaming network receiver
// Feed data to the server to send to the network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
@ -52,7 +59,7 @@ class InputStreamsSuite extends TestSuiteBase {
logInfo("Stopping context")
ssc.stop()
// Verify whether data received by Spark Streaming was as expected
// Verify whether data received was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputBuffer.size)
logInfo("output")
@ -69,6 +76,49 @@ class InputStreamsSuite extends TestSuiteBase {
}
}
test("network input stream with checkpoint") {
// Start the server
val serverPort = 9999
val server = new TestServer(9999)
server.start()
// Set up the streaming context and input streams
var ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
ssc.registerOutputStream(outputStream)
ssc.start()
// Feed data to the server to send to the network receiver
var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(1, 2, 3)) {
server.send(i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
Thread.sleep(500)
assert(outputStream.output.size > 0)
ssc.stop()
// Restart stream computation from checkpoint and feed more data to see whether
// they are being received and processed
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
ssc.start()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(4, 5, 6)) {
server.send(i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
Thread.sleep(500)
outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
assert(outputStream.output.size > 0)
ssc.stop()
}
test("file input stream") {
// Create a temporary directory
val dir = {
@ -76,7 +126,7 @@ class InputStreamsSuite extends TestSuiteBase {
temp.delete()
temp.mkdirs()
temp.deleteOnExit()
println("Created temp dir " + temp)
logInfo("Created temp dir " + temp)
temp
}
@ -84,7 +134,9 @@ class InputStreamsSuite extends TestSuiteBase {
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
val filestream = ssc.textFileStream(dir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
val outputStream = new TestOutputStream(filestream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
@ -96,37 +148,89 @@ class InputStreamsSuite extends TestSuiteBase {
Thread.sleep(1000)
for (i <- 0 until input.size) {
FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n")
Thread.sleep(500)
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
Thread.sleep(500)
Thread.sleep(100)
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
println("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size)
while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
//println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
Thread.sleep(100)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
println("Stopping context")
logInfo("Stopping context")
ssc.stop()
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputBuffer.size)
logInfo("output.size = " + output.size)
logInfo("output")
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
assert(outputBuffer.size === expectedOutput.size)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
assert(outputBuffer(i).head === expectedOutput(i))
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
assert(output(i).size === 1)
assert(output(i).head.toString === expectedOutput(i))
}
}
test("file input stream with checkpoint") {
// Create a temporary directory
val dir = {
var temp = File.createTempFile(".temp.", Random.nextInt().toString)
temp.delete()
temp.mkdirs()
temp.deleteOnExit()
println("Created temp dir " + temp)
temp
}
// Set up the streaming context and input streams
var ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
val filestream = ssc.textFileStream(dir.toString)
var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files and advance manual clock to process them
var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
Thread.sleep(500)
logInfo("Output = " + outputStream.output.mkString(","))
assert(outputStream.output.size > 0)
ssc.stop()
// Restart stream computation from checkpoint and create more files to see whether
// they are being processed
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
ssc.start()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(500)
for (i <- Seq(4, 5, 6)) {
FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
}
Thread.sleep(500)
outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
logInfo("Output = " + outputStream.output.mkString(","))
assert(outputStream.output.size > 0)
ssc.stop()
}
}