Comment out code for fault-tolerance.

This commit is contained in:
Denny 2012-11-19 09:42:35 -08:00
parent f56befa914
commit 6757ed6a40

View file

@ -15,8 +15,10 @@ import spark.storage.StorageLevel
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
// NOT USED - Originally intended for fault-tolerance
// Metadata for a Kafka Stream that it sent to the Master
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
// NOT USED - Originally intended for fault-tolerance
// Checkpoint data specific to a KafkaInputDstream
case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
@ -45,9 +47,13 @@ class KafkaInputDStream[T: ClassManifest](
// Metadata that keeps track of which messages have already been consumed.
var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]()
/* NOT USED - Originally intended for fault-tolerance
// In case of a failure, the offets for a particular timestamp will be restored.
@transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null
override protected[streaming] def addMetadata(metadata: Any) {
metadata match {
case x : KafkaInputDStreamMetadata =>
@ -80,18 +86,11 @@ class KafkaInputDStream[T: ClassManifest](
restoredOffsets = x.savedOffsets
logInfo("Restored KafkaDStream offsets: " + savedOffsets)
}
}
} */
def createReceiver(): NetworkReceiver[T] = {
// We have restored from a checkpoint, use the restored offsets
if (restoredOffsets != null) {
new KafkaReceiver(id, host, port, groupId, topics, restoredOffsets, storageLevel)
new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
} else {
new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
}
@ -103,7 +102,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
val ZK_TIMEOUT = 10000
// Handles pushing data into the BlockManager
lazy protected val dataHandler = new KafkaDataHandler(this, storageLevel)
lazy protected val dataHandler = new DataHandler(this, storageLevel)
// Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset
lazy val offsets = HashMap[KafkaPartitionKey, Long]()
// Connection to Kafka
@ -181,13 +180,15 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
}
}
class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
extends DataHandler[Any](receiver, storageLevel) {
// NOT USED - Originally intended for fault-tolerance
// class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
// extends DataHandler[Any](receiver, storageLevel) {
override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
// Creates a new Block with Kafka-specific Metadata
new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap))
}
// override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
// // Creates a new Block with Kafka-specific Metadata
// new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap))
// }
}
// }
}