Merge branch 'dev' into kafka
Conflicts: streaming/src/main/scala/spark/streaming/DStream.scala
This commit is contained in:
commit
2aceae25be
|
@ -189,11 +189,7 @@ abstract class RDD[T: ClassManifest](
|
|||
|
||||
def getCheckpointData(): Any = {
|
||||
synchronized {
|
||||
if (isCheckpointed) {
|
||||
checkpointFile
|
||||
} else {
|
||||
null
|
||||
}
|
||||
checkpointFile
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ import spark.{Logging, Utils}
|
|||
import org.apache.hadoop.fs.{FileUtil, Path}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
|
||||
import java.io.{InputStream, ObjectStreamClass, ObjectInputStream, ObjectOutputStream}
|
||||
import java.io._
|
||||
|
||||
|
||||
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
|
||||
|
@ -18,8 +18,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
|
|||
val checkpointDir = ssc.checkpointDir
|
||||
val checkpointInterval = ssc.checkpointInterval
|
||||
|
||||
validate()
|
||||
|
||||
def validate() {
|
||||
assert(master != null, "Checkpoint.master is null")
|
||||
assert(framework != null, "Checkpoint.framework is null")
|
||||
|
@ -27,35 +25,50 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
|
|||
assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
|
||||
logInfo("Checkpoint for time " + checkpointTime + " validated")
|
||||
}
|
||||
}
|
||||
|
||||
def save(path: String) {
|
||||
val file = new Path(path, "graph")
|
||||
val conf = new Configuration()
|
||||
val fs = file.getFileSystem(conf)
|
||||
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
|
||||
if (fs.exists(file)) {
|
||||
val bkFile = new Path(file.getParent, file.getName + ".bk")
|
||||
FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
|
||||
logDebug("Moved existing checkpoint file to " + bkFile)
|
||||
/**
|
||||
* Convenience class to speed up the writing of graph checkpoint to file
|
||||
*/
|
||||
class CheckpointWriter(checkpointDir: String) extends Logging {
|
||||
val file = new Path(checkpointDir, "graph")
|
||||
val conf = new Configuration()
|
||||
var fs = file.getFileSystem(conf)
|
||||
val maxAttempts = 3
|
||||
|
||||
def write(checkpoint: Checkpoint) {
|
||||
// TODO: maybe do this in a different thread from the main stream execution thread
|
||||
var attempts = 0
|
||||
while (attempts < maxAttempts) {
|
||||
attempts += 1
|
||||
try {
|
||||
logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
|
||||
if (fs.exists(file)) {
|
||||
val bkFile = new Path(file.getParent, file.getName + ".bk")
|
||||
FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
|
||||
logDebug("Moved existing checkpoint file to " + bkFile)
|
||||
}
|
||||
val fos = fs.create(file)
|
||||
val oos = new ObjectOutputStream(fos)
|
||||
oos.writeObject(checkpoint)
|
||||
oos.close()
|
||||
logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'")
|
||||
fos.close()
|
||||
return
|
||||
} catch {
|
||||
case ioe: IOException =>
|
||||
logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
|
||||
}
|
||||
}
|
||||
val fos = fs.create(file)
|
||||
val oos = new ObjectOutputStream(fos)
|
||||
oos.writeObject(this)
|
||||
oos.close()
|
||||
fs.close()
|
||||
logInfo("Checkpoint of streaming context for time " + checkpointTime + " saved successfully to file '" + file + "'")
|
||||
}
|
||||
|
||||
def toBytes(): Array[Byte] = {
|
||||
val bytes = Utils.serialize(this)
|
||||
bytes
|
||||
logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
|
||||
}
|
||||
}
|
||||
|
||||
object Checkpoint extends Logging {
|
||||
|
||||
def load(path: String): Checkpoint = {
|
||||
|
||||
object CheckpointReader extends Logging {
|
||||
|
||||
def read(path: String): Checkpoint = {
|
||||
val fs = new Path(path).getFileSystem(new Configuration())
|
||||
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
|
||||
|
||||
|
@ -82,17 +95,11 @@ object Checkpoint extends Logging {
|
|||
logError("Error loading checkpoint from file '" + file + "'", e)
|
||||
}
|
||||
} else {
|
||||
logWarning("Could not load checkpoint from file '" + file + "' as it does not exist")
|
||||
logWarning("Could not read checkpoint from file '" + file + "' as it does not exist")
|
||||
}
|
||||
|
||||
})
|
||||
throw new Exception("Could not load checkpoint from path '" + path + "'")
|
||||
}
|
||||
|
||||
def fromBytes(bytes: Array[Byte]): Checkpoint = {
|
||||
val cp = Utils.deserialize[Checkpoint](bytes)
|
||||
cp.validate()
|
||||
cp
|
||||
throw new Exception("Could not read checkpoint from path '" + path + "'")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -233,7 +233,7 @@ extends Serializable with Logging {
|
|||
}
|
||||
if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
|
||||
newRDD.checkpoint()
|
||||
logInfo("Marking RDD for time " + time + " for checkpointing at time " + time)
|
||||
logInfo("Marking RDD " + newRDD + " for time " + time + " for checkpointing at time " + time)
|
||||
}
|
||||
generatedRDDs.put(time, newRDD)
|
||||
Some(newRDD)
|
||||
|
@ -300,6 +300,9 @@ extends Serializable with Logging {
|
|||
* this method to save custom checkpoint data.
|
||||
*/
|
||||
protected[streaming] def updateCheckpointData(currentTime: Time) {
|
||||
|
||||
logInfo("Updating checkpoint data for time " + currentTime)
|
||||
|
||||
// Get the checkpointed RDDs from the generated RDDs
|
||||
val newRdds = generatedRDDs.filter(_._2.getCheckpointData() != null)
|
||||
.map(x => (x._1, x._2.getCheckpointData()))
|
||||
|
@ -342,8 +345,11 @@ extends Serializable with Logging {
|
|||
logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs")
|
||||
checkpointData.rdds.foreach {
|
||||
case(time, data) => {
|
||||
logInfo("Restoring checkpointed RDD for time " + time + " from file")
|
||||
generatedRDDs += ((time, ssc.sc.objectFile[T](data.toString)))
|
||||
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
|
||||
val rdd = ssc.sc.objectFile[T](data.toString)
|
||||
// Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
|
||||
rdd.checkpointFile = data.toString
|
||||
generatedRDDs += ((time, rdd))
|
||||
}
|
||||
}
|
||||
dependencies.foreach(_.restoreCheckpointData())
|
||||
|
|
|
@ -57,7 +57,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
|
|||
|
||||
override def checkpoint(interval: Time): DStream[(K, V)] = {
|
||||
super.checkpoint(interval)
|
||||
reducedStream.checkpoint(interval)
|
||||
//reducedStream.checkpoint(interval)
|
||||
this
|
||||
}
|
||||
|
||||
|
|
|
@ -16,8 +16,16 @@ extends Logging {
|
|||
initLogging()
|
||||
|
||||
val graph = ssc.graph
|
||||
|
||||
val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt
|
||||
val jobManager = new JobManager(ssc, concurrentJobs)
|
||||
|
||||
val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) {
|
||||
new CheckpointWriter(ssc.checkpointDir)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
||||
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
|
||||
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
|
||||
val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_))
|
||||
|
@ -52,19 +60,23 @@ extends Logging {
|
|||
logInfo("Scheduler stopped")
|
||||
}
|
||||
|
||||
def generateRDDs(time: Time) {
|
||||
private def generateRDDs(time: Time) {
|
||||
SparkEnv.set(ssc.env)
|
||||
logInfo("\n-----------------------------------------------------\n")
|
||||
graph.generateRDDs(time).foreach(submitJob)
|
||||
logInfo("Generated RDDs for time " + time)
|
||||
graph.generateRDDs(time).foreach(jobManager.runJob)
|
||||
graph.forgetOldRDDs(time)
|
||||
if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) {
|
||||
ssc.doCheckpoint(time)
|
||||
}
|
||||
doCheckpoint(time)
|
||||
logInfo("Generated RDDs for time " + time)
|
||||
}
|
||||
|
||||
def submitJob(job: Job) {
|
||||
jobManager.runJob(job)
|
||||
private def doCheckpoint(time: Time) {
|
||||
if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) {
|
||||
val startTime = System.currentTimeMillis()
|
||||
ssc.graph.updateCheckpointData(time)
|
||||
checkpointWriter.write(new Checkpoint(ssc, time))
|
||||
val stopTime = System.currentTimeMillis()
|
||||
logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,20 +7,11 @@ import spark.rdd.MapPartitionsRDD
|
|||
import spark.SparkContext._
|
||||
import spark.storage.StorageLevel
|
||||
|
||||
|
||||
class StateRDD[U: ClassManifest, T: ClassManifest](
|
||||
prev: RDD[T],
|
||||
f: Iterator[T] => Iterator[U],
|
||||
rememberPartitioner: Boolean
|
||||
) extends MapPartitionsRDD[U, T](prev, f) {
|
||||
override val partitioner = if (rememberPartitioner) prev.partitioner else None
|
||||
}
|
||||
|
||||
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
|
||||
parent: DStream[(K, V)],
|
||||
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
|
||||
partitioner: Partitioner,
|
||||
rememberPartitioner: Boolean
|
||||
preservePartitioning: Boolean
|
||||
) extends DStream[(K, S)](parent.ssc) {
|
||||
|
||||
super.persist(StorageLevel.MEMORY_ONLY_SER)
|
||||
|
@ -53,7 +44,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
|
|||
updateFuncLocal(i)
|
||||
}
|
||||
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
|
||||
val stateRDD = new StateRDD(cogroupedRDD, finalFunc, rememberPartitioner)
|
||||
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
|
||||
//logDebug("Generating state RDD for time " + validTime)
|
||||
return Some(stateRDD)
|
||||
}
|
||||
|
@ -78,7 +69,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
|
|||
}
|
||||
|
||||
val groupedRDD = parentRDD.groupByKey(partitioner)
|
||||
val sessionRDD = new StateRDD(groupedRDD, finalFunc, rememberPartitioner)
|
||||
val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
|
||||
//logDebug("Generating state RDD for time " + validTime + " (first)")
|
||||
return Some(sessionRDD)
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ final class StreamingContext (
|
|||
def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) =
|
||||
this(new SparkContext(master, frameworkName, sparkHome, jars), null)
|
||||
|
||||
def this(path: String) = this(null, Checkpoint.load(path))
|
||||
def this(path: String) = this(null, CheckpointReader.read(path))
|
||||
|
||||
def this(cp_ : Checkpoint) = this(null, cp_)
|
||||
|
||||
|
@ -85,7 +85,7 @@ final class StreamingContext (
|
|||
graph.setRememberDuration(duration)
|
||||
}
|
||||
|
||||
def checkpoint(dir: String, interval: Time) {
|
||||
def checkpoint(dir: String, interval: Time = null) {
|
||||
if (dir != null) {
|
||||
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
|
||||
checkpointDir = dir
|
||||
|
@ -211,12 +211,29 @@ final class StreamingContext (
|
|||
graph.addOutputStream(outputStream)
|
||||
}
|
||||
|
||||
def validate() {
|
||||
assert(graph != null, "Graph is null")
|
||||
graph.validate()
|
||||
|
||||
assert(
|
||||
checkpointDir == null || checkpointInterval != null,
|
||||
"Checkpoint directory has been set, but the graph checkpointing interval has " +
|
||||
"not been set. Please use StreamingContext.checkpoint() to set the interval."
|
||||
)
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This function starts the execution of the streams.
|
||||
*/
|
||||
def start() {
|
||||
assert(graph != null, "Graph is null")
|
||||
graph.validate()
|
||||
if (checkpointDir != null && checkpointInterval == null && graph != null) {
|
||||
checkpointInterval = graph.batchDuration
|
||||
}
|
||||
|
||||
validate()
|
||||
|
||||
val networkInputStreams = graph.getInputStreams().filter(s => s match {
|
||||
case n: NetworkInputDStream[_] => true
|
||||
|
@ -250,14 +267,6 @@ final class StreamingContext (
|
|||
case e: Exception => logWarning("Error while stopping", e)
|
||||
}
|
||||
}
|
||||
|
||||
def doCheckpoint(currentTime: Time) {
|
||||
val startTime = System.currentTimeMillis()
|
||||
graph.updateCheckpointData(currentTime)
|
||||
new Checkpoint(this, currentTime).save(checkpointDir)
|
||||
val stopTime = System.currentTimeMillis()
|
||||
logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# Set everything to be logged to the console
|
||||
log4j.rootCategory=WARN, console
|
||||
log4j.rootCategory=INFO, console
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
|
||||
|
|
|
@ -24,7 +24,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
|
||||
override def framework = "CheckpointSuite"
|
||||
|
||||
override def batchDuration = Milliseconds(200)
|
||||
override def batchDuration = Milliseconds(500)
|
||||
|
||||
override def checkpointDir = "checkpoint"
|
||||
|
||||
|
@ -34,7 +34,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
|
||||
test("basic stream+rdd recovery") {
|
||||
|
||||
assert(batchDuration === Milliseconds(200), "batchDuration for this test must be 1 second")
|
||||
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
|
||||
assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
|
||||
|
||||
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
|
||||
|
@ -134,9 +134,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
val operation = (st: DStream[String]) => {
|
||||
st.map(x => (x, 1))
|
||||
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
|
||||
.checkpoint(Seconds(2))
|
||||
.checkpoint(batchDuration * 2)
|
||||
}
|
||||
testCheckpointedOperation(input, operation, output, 3)
|
||||
testCheckpointedOperation(input, operation, output, 7)
|
||||
}
|
||||
|
||||
test("updateStateByKey") {
|
||||
|
@ -148,14 +148,18 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
}
|
||||
st.map(x => (x, 1))
|
||||
.updateStateByKey[RichInt](updateFunc)
|
||||
.checkpoint(Seconds(2))
|
||||
.checkpoint(batchDuration * 2)
|
||||
.map(t => (t._1, t._2.self))
|
||||
}
|
||||
testCheckpointedOperation(input, operation, output, 3)
|
||||
testCheckpointedOperation(input, operation, output, 7)
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Tests a streaming operation under checkpointing, by restart the operation
|
||||
* from checkpoint file and verifying whether the final output is correct.
|
||||
* The output is assumed to have come from a reliable queue which an replay
|
||||
* data as required.
|
||||
*/
|
||||
def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
|
||||
input: Seq[Seq[U]],
|
||||
operation: DStream[U] => DStream[V],
|
||||
|
@ -170,8 +174,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
val initialNumExpectedOutputs = initialNumBatches
|
||||
val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs
|
||||
|
||||
// Do half the computation (half the number of batches), create checkpoint file and quit
|
||||
|
||||
// Do the computation for initial number of batches, create checkpoint file and quit
|
||||
ssc = setupStreams[U, V](input, operation)
|
||||
val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
|
||||
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
|
||||
|
@ -193,8 +196,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
* Advances the manual clock on the streaming scheduler by given number of batches.
|
||||
* It also wait for the expected amount of time for each batch.
|
||||
*/
|
||||
|
||||
|
||||
def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
|
||||
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
logInfo("Manual clock before advancing = " + clock.time)
|
||||
|
|
|
@ -16,24 +16,36 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
|
||||
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
|
||||
|
||||
val testPort = 9999
|
||||
var testServer: TestServer = null
|
||||
var testDir: File = null
|
||||
|
||||
override def checkpointDir = "checkpoint"
|
||||
|
||||
after {
|
||||
FileUtils.deleteDirectory(new File(checkpointDir))
|
||||
if (testServer != null) {
|
||||
testServer.stop()
|
||||
testServer = null
|
||||
}
|
||||
if (testDir != null && testDir.exists()) {
|
||||
FileUtils.deleteDirectory(testDir)
|
||||
testDir = null
|
||||
}
|
||||
}
|
||||
|
||||
test("network input stream") {
|
||||
// Start the server
|
||||
val serverPort = 9999
|
||||
val server = new TestServer(9999)
|
||||
server.start()
|
||||
testServer = new TestServer(testPort)
|
||||
testServer.start()
|
||||
|
||||
// Set up the streaming context and input streams
|
||||
val ssc = new StreamingContext(master, framework)
|
||||
ssc.setBatchDuration(batchDuration)
|
||||
val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
|
||||
val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
|
||||
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
|
||||
val outputStream = new TestOutputStream(networkStream, outputBuffer)
|
||||
def output = outputBuffer.flatMap(x => x)
|
||||
ssc.registerOutputStream(outputStream)
|
||||
ssc.start()
|
||||
|
||||
|
@ -41,21 +53,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
val input = Seq(1, 2, 3, 4, 5)
|
||||
val expectedOutput = input.map(_.toString)
|
||||
Thread.sleep(1000)
|
||||
for (i <- 0 until input.size) {
|
||||
server.send(input(i).toString + "\n")
|
||||
testServer.send(input(i).toString + "\n")
|
||||
Thread.sleep(500)
|
||||
clock.addToTime(batchDuration.milliseconds)
|
||||
}
|
||||
val startTime = System.currentTimeMillis()
|
||||
while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
|
||||
logInfo("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size)
|
||||
Thread.sleep(100)
|
||||
}
|
||||
Thread.sleep(1000)
|
||||
val timeTaken = System.currentTimeMillis() - startTime
|
||||
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
|
||||
logInfo("Stopping server")
|
||||
server.stop()
|
||||
testServer.stop()
|
||||
logInfo("Stopping context")
|
||||
ssc.stop()
|
||||
|
||||
|
@ -69,24 +75,24 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
|
||||
logInfo("--------------------------------")
|
||||
|
||||
assert(outputBuffer.size === expectedOutput.size)
|
||||
for (i <- 0 until outputBuffer.size) {
|
||||
assert(outputBuffer(i).size === 1)
|
||||
assert(outputBuffer(i).head === expectedOutput(i))
|
||||
// Verify whether all the elements received are as expected
|
||||
// (whether the elements were received one in each interval is not verified)
|
||||
assert(output.size === expectedOutput.size)
|
||||
for (i <- 0 until output.size) {
|
||||
assert(output(i) === expectedOutput(i))
|
||||
}
|
||||
}
|
||||
|
||||
test("network input stream with checkpoint") {
|
||||
// Start the server
|
||||
val serverPort = 9999
|
||||
val server = new TestServer(9999)
|
||||
server.start()
|
||||
testServer = new TestServer(testPort)
|
||||
testServer.start()
|
||||
|
||||
// Set up the streaming context and input streams
|
||||
var ssc = new StreamingContext(master, framework)
|
||||
ssc.setBatchDuration(batchDuration)
|
||||
ssc.checkpoint(checkpointDir, checkpointInterval)
|
||||
val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
|
||||
val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
|
||||
var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
|
||||
ssc.registerOutputStream(outputStream)
|
||||
ssc.start()
|
||||
|
@ -94,7 +100,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
// Feed data to the server to send to the network receiver
|
||||
var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
for (i <- Seq(1, 2, 3)) {
|
||||
server.send(i.toString + "\n")
|
||||
testServer.send(i.toString + "\n")
|
||||
Thread.sleep(100)
|
||||
clock.addToTime(batchDuration.milliseconds)
|
||||
}
|
||||
|
@ -109,7 +115,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
ssc.start()
|
||||
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
for (i <- Seq(4, 5, 6)) {
|
||||
server.send(i.toString + "\n")
|
||||
testServer.send(i.toString + "\n")
|
||||
Thread.sleep(100)
|
||||
clock.addToTime(batchDuration.milliseconds)
|
||||
}
|
||||
|
@ -120,12 +126,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
}
|
||||
|
||||
test("file input stream") {
|
||||
|
||||
// Create a temporary directory
|
||||
val dir = {
|
||||
testDir = {
|
||||
var temp = File.createTempFile(".temp.", Random.nextInt().toString)
|
||||
temp.delete()
|
||||
temp.mkdirs()
|
||||
temp.deleteOnExit()
|
||||
logInfo("Created temp dir " + temp)
|
||||
temp
|
||||
}
|
||||
|
@ -133,10 +139,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
// Set up the streaming context and input streams
|
||||
val ssc = new StreamingContext(master, framework)
|
||||
ssc.setBatchDuration(batchDuration)
|
||||
val filestream = ssc.textFileStream(dir.toString)
|
||||
val filestream = ssc.textFileStream(testDir.toString)
|
||||
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
|
||||
def output = outputBuffer.flatMap(x => x)
|
||||
|
||||
val outputStream = new TestOutputStream(filestream, outputBuffer)
|
||||
ssc.registerOutputStream(outputStream)
|
||||
ssc.start()
|
||||
|
@ -147,16 +152,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
val expectedOutput = input.map(_.toString)
|
||||
Thread.sleep(1000)
|
||||
for (i <- 0 until input.size) {
|
||||
FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n")
|
||||
Thread.sleep(100)
|
||||
FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
|
||||
Thread.sleep(500)
|
||||
clock.addToTime(batchDuration.milliseconds)
|
||||
Thread.sleep(100)
|
||||
//Thread.sleep(100)
|
||||
}
|
||||
val startTime = System.currentTimeMillis()
|
||||
while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
|
||||
//println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
|
||||
/*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
|
||||
logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
|
||||
Thread.sleep(100)
|
||||
}
|
||||
}*/
|
||||
Thread.sleep(1000)
|
||||
val timeTaken = System.currentTimeMillis() - startTime
|
||||
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
|
||||
|
@ -165,14 +170,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
|
||||
// Verify whether data received by Spark Streaming was as expected
|
||||
logInfo("--------------------------------")
|
||||
logInfo("output.size = " + output.size)
|
||||
logInfo("output.size = " + outputBuffer.size)
|
||||
logInfo("output")
|
||||
output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
|
||||
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
|
||||
logInfo("expected output.size = " + expectedOutput.size)
|
||||
logInfo("expected output")
|
||||
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
|
||||
logInfo("--------------------------------")
|
||||
|
||||
// Verify whether all the elements received are as expected
|
||||
// (whether the elements were received one in each interval is not verified)
|
||||
assert(output.size === expectedOutput.size)
|
||||
for (i <- 0 until output.size) {
|
||||
assert(output(i).size === 1)
|
||||
|
@ -182,12 +189,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
|
||||
test("file input stream with checkpoint") {
|
||||
// Create a temporary directory
|
||||
val dir = {
|
||||
testDir = {
|
||||
var temp = File.createTempFile(".temp.", Random.nextInt().toString)
|
||||
temp.delete()
|
||||
temp.mkdirs()
|
||||
temp.deleteOnExit()
|
||||
println("Created temp dir " + temp)
|
||||
logInfo("Created temp dir " + temp)
|
||||
temp
|
||||
}
|
||||
|
||||
|
@ -195,7 +201,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
var ssc = new StreamingContext(master, framework)
|
||||
ssc.setBatchDuration(batchDuration)
|
||||
ssc.checkpoint(checkpointDir, checkpointInterval)
|
||||
val filestream = ssc.textFileStream(dir.toString)
|
||||
val filestream = ssc.textFileStream(testDir.toString)
|
||||
var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
|
||||
ssc.registerOutputStream(outputStream)
|
||||
ssc.start()
|
||||
|
@ -204,7 +210,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
Thread.sleep(1000)
|
||||
for (i <- Seq(1, 2, 3)) {
|
||||
FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
|
||||
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
|
||||
Thread.sleep(100)
|
||||
clock.addToTime(batchDuration.milliseconds)
|
||||
}
|
||||
|
@ -221,7 +227,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
Thread.sleep(500)
|
||||
for (i <- Seq(4, 5, 6)) {
|
||||
FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
|
||||
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
|
||||
Thread.sleep(100)
|
||||
clock.addToTime(batchDuration.milliseconds)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue