Changed method name of createReceiver to getReceiver as it is not intended to be a factory.

This commit is contained in:
Prashant Sharma 2013-01-16 14:34:58 +05:30
parent 11bbe23140
commit bb6ab92e31
7 changed files with 29 additions and 29 deletions

View file

@ -22,7 +22,7 @@ class TwitterInputDStream(
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
override def createReceiver(): NetworkReceiver[Status] = {
override def getReceiver(): NetworkReceiver[Status] = {
new TwitterReceiver(username, password, filters, storageLevel)
}
}

View file

@ -23,7 +23,7 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) ext
*/
private[streaming]
class NetworkInputTracker(
@transient ssc: StreamingContext,
@transient ssc: StreamingContext,
@transient networkInputStreams: Array[NetworkInputDStream[_]])
extends Logging {
@ -65,12 +65,12 @@ class NetworkInputTracker(
def receive = {
case RegisterReceiver(streamId, receiverActor) => {
if (!networkInputStreamMap.contains(streamId)) {
throw new Exception("Register received for unexpected id " + streamId)
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
sender ! true
}
}
case AddBlocks(streamId, blockIds, metadata) => {
val tmp = receivedBlockIds.synchronized {
if (!receivedBlockIds.contains(streamId)) {
@ -95,8 +95,8 @@ class NetworkInputTracker(
/** This thread class runs all the receivers on the cluster. */
class ReceiverExecutor extends Thread {
val env = ssc.env
override def run() {
override def run() {
try {
SparkEnv.set(env)
startReceivers()
@ -113,7 +113,7 @@ class NetworkInputTracker(
*/
def startReceivers() {
val receivers = networkInputStreams.map(nis => {
val rcvr = nis.createReceiver()
val rcvr = nis.getReceiver()
rcvr.setStreamId(nis.id)
rcvr
})
@ -141,7 +141,7 @@ class NetworkInputTracker(
// Distribute the receivers and start them
ssc.sc.runJob(tempRDD, startReceiver)
}
/** Stops the receivers. */
def stopReceivers() {
// Signal the receivers to stop

View file

@ -25,7 +25,7 @@ class FlumeInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
@ -134,4 +134,4 @@ class FlumeReceiver(
}
override def getLocationPreference = Some(host)
}
}

View file

@ -31,7 +31,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
/**
* Input stream that pulls messages from a Kafka Broker.
*
*
* @param host Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
@ -54,13 +54,13 @@ class KafkaInputDStream[T: ClassManifest](
// Metadata that keeps track of which messages have already been consumed.
var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]()
/* NOT USED - Originally intended for fault-tolerance
// In case of a failure, the offets for a particular timestamp will be restored.
@transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null
override protected[streaming] def addMetadata(metadata: Any) {
metadata match {
case x : KafkaInputDStreamMetadata =>
@ -88,14 +88,14 @@ class KafkaInputDStream[T: ClassManifest](
override protected[streaming] def restoreCheckpointData() {
super.restoreCheckpointData()
logInfo("Restoring KafkaDStream checkpoint data.")
checkpointData match {
case x : KafkaDStreamCheckpointData =>
checkpointData match {
case x : KafkaDStreamCheckpointData =>
restoredOffsets = x.savedOffsets
logInfo("Restored KafkaDStream offsets: " + savedOffsets)
}
} */
def createReceiver(): NetworkReceiver[T] = {
def getReceiver(): NetworkReceiver[T] = {
new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
@ -103,7 +103,7 @@ class KafkaInputDStream[T: ClassManifest](
private[streaming]
class KafkaReceiver(host: String, port: Int, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
// Timeout for establishing a connection to Zookeper in ms.
@ -130,7 +130,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
val zooKeeperEndPoint = host + ":" + port
logInfo("Starting Kafka Consumer Stream with group: " + groupId)
logInfo("Initial offsets: " + initialOffsets.toString)
// Zookeper connection properties
val props = new Properties()
props.put("zk.connect", zooKeeperEndPoint)
@ -161,7 +161,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
offsets.foreach { case(key, offset) =>
val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
val partitionName = key.brokerId + "-" + key.partId
updatePersistentPath(consumerConnector.zkClient,
updatePersistentPath(consumerConnector.zkClient,
topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString)
}
}
@ -174,7 +174,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
blockGenerator += msgAndMetadata.message
// Updating the offet. The key is (broker, topic, group, partition).
val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
groupId, msgAndMetadata.topicInfo.partition.partId)
val offset = msgAndMetadata.topicInfo.getConsumeOffset
offsets.put(key, offset)
@ -182,12 +182,12 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
// Keep on handling messages
true
}
}
}
}
// NOT USED - Originally intended for fault-tolerance
// class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
// class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
// extends BufferingBlockCreator[Any](receiver, storageLevel) {
// override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {

View file

@ -20,7 +20,7 @@ import java.util.concurrent.ArrayBlockingQueue
/**
* Abstract class for defining any InputDStream that has to start a receiver on worker
* nodes to receive external data. Specific implementations of NetworkInputDStream must
* define the createReceiver() function that creates the receiver object of type
* define the getReceiver() function that gets the receiver object of type
* [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
* data.
* @param ssc_ Streaming context that will execute this input stream
@ -34,11 +34,11 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
val id = ssc.getNewNetworkStreamId()
/**
* Creates the receiver object that will be sent to the worker nodes
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a NetworkInputDStream.
*/
def createReceiver(): NetworkReceiver[T]
def getReceiver(): NetworkReceiver[T]
// Nothing to start or stop as both taken care of by the NetworkInputTracker.
def start() {}
@ -46,7 +46,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
def stop() {}
override def compute(validTime: Time): Option[RDD[T]] = {
val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
Some(new BlockRDD[T](ssc.sc, blockIds))
}
}

View file

@ -25,7 +25,7 @@ class RawInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
def createReceiver(): NetworkReceiver[T] = {
def getReceiver(): NetworkReceiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}

View file

@ -15,7 +15,7 @@ class SocketInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) {
def createReceiver(): NetworkReceiver[T] = {
def getReceiver(): NetworkReceiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}