Refactored DStreamCheckpointData.

This commit is contained in:
Tathagata Das 2013-01-22 00:43:31 -08:00
parent 86057ec7c8
commit 364cdb679c
4 changed files with 99 additions and 64 deletions

View file

@ -12,7 +12,7 @@ import scala.collection.mutable.HashMap
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
/**
@ -75,7 +75,7 @@ abstract class DStream[T: ClassManifest] (
// Checkpoint details
protected[streaming] val mustCheckpoint = false
protected[streaming] var checkpointDuration: Duration = null
protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
protected[streaming] val checkpointData = new DStreamCheckpointData(this)
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
@ -85,10 +85,10 @@ abstract class DStream[T: ClassManifest] (
// Duration for which the DStream requires its parent DStream to remember each RDD created
protected[streaming] def parentRememberDuration = rememberDuration
/** Returns the StreamingContext associated with this DStream */
/** Return the StreamingContext associated with this DStream */
def context() = ssc
/** Persists the RDDs of this DStream with the given storage level */
/** Persist the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
throw new UnsupportedOperationException(
@ -342,40 +342,10 @@ abstract class DStream[T: ClassManifest] (
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
logInfo("Updating checkpoint data for time " + currentTime)
// Get the checkpointed RDDs from the generated RDDs
val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
.map(x => (x._1, x._2.getCheckpointFile.get))
// Make a copy of the existing checkpoint data (checkpointed RDDs)
val oldRdds = checkpointData.rdds.clone()
// If the new checkpoint data has checkpoints then replace existing with the new one
if (newRdds.size > 0) {
checkpointData.rdds.clear()
checkpointData.rdds ++= newRdds
}
// Make parent DStreams update their checkpoint data
checkpointData.update()
dependencies.foreach(_.updateCheckpointData(currentTime))
// TODO: remove this, this is just for debugging
newRdds.foreach {
case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
}
if (newRdds.size > 0) {
(oldRdds -- newRdds.keySet).foreach {
case (time, data) => {
val path = new Path(data.toString)
val fs = path.getFileSystem(new Configuration())
fs.delete(path, true)
logInfo("Deleted checkpoint file '" + path + "' for time " + time)
}
}
}
logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, "
+ "[" + checkpointData.rdds.mkString(",") + "]")
checkpointData.cleanup()
logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}
/**
@ -386,14 +356,8 @@ abstract class DStream[T: ClassManifest] (
*/
protected[streaming] def restoreCheckpointData() {
// Create RDDs from the checkpoint data
logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs")
checkpointData.rdds.foreach {
case(time, data) => {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
val rdd = ssc.sc.checkpointFile[T](data.toString)
generatedRDDs += ((time, rdd))
}
}
logInfo("Restoring checkpoint data from " + checkpointData.checkpointFiles.size + " checkpointed RDDs")
checkpointData.restore()
dependencies.foreach(_.restoreCheckpointData())
logInfo("Restored checkpoint data")
}
@ -651,7 +615,3 @@ abstract class DStream[T: ClassManifest] (
ssc.registerOutputStream(this)
}
}
private[streaming]
case class DStreamCheckpointData(rdds: HashMap[Time, Any])

View file

@ -0,0 +1,84 @@
package spark.streaming
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import collection.mutable.HashMap
import spark.Logging
private[streaming]
class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
extends Serializable with Logging {
private[streaming] val checkpointFiles = new HashMap[Time, String]()
@transient private lazy val fileSystem =
new Path(dstream.context.checkpointDir).getFileSystem(new Configuration())
@transient private var lastCheckpointFiles: HashMap[Time, String] = null
/**
* Update the checkpoint data of the DStream. Default implementation records the checkpoint files to
* which the generate RDDs of the DStream has been saved.
*/
def update() {
// Get the checkpointed RDDs from the generated RDDs
val newCheckpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
.map(x => (x._1, x._2.getCheckpointFile.get))
// Make a copy of the existing checkpoint data (checkpointed RDDs)
lastCheckpointFiles = checkpointFiles.clone()
// If the new checkpoint data has checkpoints then replace existing with the new one
if (newCheckpointFiles.size > 0) {
checkpointFiles.clear()
checkpointFiles ++= newCheckpointFiles
}
// TODO: remove this, this is just for debugging
newCheckpointFiles.foreach {
case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
}
}
/**
* Cleanup old checkpoint data. Default implementation, cleans up old checkpoint files.
*/
def cleanup() {
// If there is at least on checkpoint file in the current checkpoint files,
// then delete the old checkpoint files.
if (checkpointFiles.size > 0 && lastCheckpointFiles != null) {
(lastCheckpointFiles -- checkpointFiles.keySet).foreach {
case (time, file) => {
try {
val path = new Path(file)
fileSystem.delete(path, true)
logInfo("Deleted checkpoint file '" + file + "' for time " + time)
} catch {
case e: Exception =>
logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)
}
}
}
}
}
/**
* Restore the checkpoint data. Default implementation restores the RDDs from their
* checkpoint files.
*/
def restore() {
// Create RDDs from the checkpoint data
checkpointFiles.foreach {
case(time, file) => {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
dstream.generatedRDDs += ((time, dstream.context.sc.checkpointFile[T](file)))
}
}
}
override def toString() = {
"[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]"
}
}

View file

@ -19,15 +19,6 @@ import scala.collection.JavaConversions._
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
// NOT USED - Originally intended for fault-tolerance
// Metadata for a Kafka Stream that it sent to the Master
private[streaming]
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
// NOT USED - Originally intended for fault-tolerance
// Checkpoint data specific to a KafkaInputDstream
private[streaming]
case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
* Input stream that pulls messages from a Kafka Broker.

View file

@ -63,9 +63,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// then check whether some RDD has been checkpointed or not
ssc.start()
runStreamsWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]")
assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.rdds.foreach {
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
@ -74,7 +74,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString))
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
runStreamsWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@ -91,8 +91,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// is present in the checkpoint data or not
ssc.start()
runStreamsWithRealDelay(ssc, 1)
assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.rdds.foreach {
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(),