Added capabllity to take streaming input from network. Renamed SparkStreamContext to StreamingContext.
This commit is contained in:
parent
cae894ee7a
commit
b5b93a621c
|
@ -5,7 +5,7 @@ import spark.RDD
|
|||
/**
|
||||
* An input stream that always returns the same RDD on each timestep. Useful for testing.
|
||||
*/
|
||||
class ConstantInputDStream[T: ClassManifest](ssc: SparkStreamContext, rdd: RDD[T])
|
||||
class ConstantInputDStream[T: ClassManifest](ssc: StreamingContext, rdd: RDD[T])
|
||||
extends InputDStream[T](ssc) {
|
||||
|
||||
override def start() {}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.streaming.SparkStreamContext._
|
||||
import spark.streaming.StreamingContext._
|
||||
|
||||
import spark.RDD
|
||||
import spark.BlockRDD
|
||||
|
@ -15,7 +15,7 @@ import scala.collection.mutable.HashMap
|
|||
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
|
||||
abstract class DStream[T: ClassManifest] (@transient val ssc: SparkStreamContext)
|
||||
abstract class DStream[T: ClassManifest] (@transient val ssc: StreamingContext)
|
||||
extends Logging with Serializable {
|
||||
|
||||
initLogging()
|
||||
|
@ -142,7 +142,7 @@ extends Logging with Serializable {
|
|||
}
|
||||
|
||||
/**
|
||||
* This method generates a SparkStream job for the given time
|
||||
* This method generates a SparkStreaming job for the given time
|
||||
* and may require to be overriden by subclasses
|
||||
*/
|
||||
def generateJob(time: Time): Option[Job] = {
|
||||
|
@ -249,7 +249,7 @@ extends Logging with Serializable {
|
|||
|
||||
|
||||
abstract class InputDStream[T: ClassManifest] (
|
||||
ssc: SparkStreamContext)
|
||||
@transient ssc: StreamingContext)
|
||||
extends DStream[T](ssc) {
|
||||
|
||||
override def dependencies = List()
|
||||
|
@ -397,7 +397,7 @@ extends DStream[T](parents(0).ssc) {
|
|||
}
|
||||
|
||||
if (parents.map(_.ssc).distinct.size > 1) {
|
||||
throw new IllegalArgumentException("Array of parents have different SparkStreamContexts")
|
||||
throw new IllegalArgumentException("Array of parents have different StreamingContexts")
|
||||
}
|
||||
|
||||
if (parents.map(_.slideTime).distinct.size > 1) {
|
||||
|
|
|
@ -19,7 +19,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
|||
|
||||
|
||||
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
|
||||
ssc: SparkStreamContext,
|
||||
ssc: StreamingContext,
|
||||
directory: Path,
|
||||
filter: PathFilter = FileInputDStream.defaultPathFilter,
|
||||
newFilesOnly: Boolean = true)
|
||||
|
@ -28,7 +28,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
|
|||
val fs = directory.getFileSystem(new Configuration())
|
||||
var lastModTime: Long = 0
|
||||
|
||||
override def start() {
|
||||
override def start() {
|
||||
if (newFilesOnly) {
|
||||
lastModTime = System.currentTimeMillis()
|
||||
} else {
|
||||
|
@ -82,83 +82,3 @@ object FileInputDStream {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
class NetworkInputDStream[T: ClassManifest](
|
||||
val networkInputName: String,
|
||||
val addresses: Array[InetSocketAddress],
|
||||
batchDuration: Time,
|
||||
ssc: SparkStreamContext)
|
||||
extends InputDStream[T](networkInputName, batchDuration, ssc) {
|
||||
|
||||
|
||||
// TODO(Haoyuan): This is for the performance test.
|
||||
@transient var rdd: RDD[T] = null
|
||||
|
||||
if (System.getProperty("spark.fake", "false") == "true") {
|
||||
logInfo("Running initial count to cache fake RDD")
|
||||
rdd = ssc.sc.textFile(SparkContext.inputFile,
|
||||
SparkContext.idealPartitions).asInstanceOf[RDD[T]]
|
||||
val fakeCacheLevel = System.getProperty("spark.fake.cache", "")
|
||||
if (fakeCacheLevel == "MEMORY_ONLY_2") {
|
||||
rdd.persist(StorageLevel.MEMORY_ONLY_2)
|
||||
} else if (fakeCacheLevel == "MEMORY_ONLY_DESER_2") {
|
||||
rdd.persist(StorageLevel.MEMORY_ONLY_2)
|
||||
} else if (fakeCacheLevel != "") {
|
||||
logError("Invalid fake cache level: " + fakeCacheLevel)
|
||||
System.exit(1)
|
||||
}
|
||||
rdd.count()
|
||||
}
|
||||
|
||||
@transient val references = new HashMap[Time,String]
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[T]] = {
|
||||
if (System.getProperty("spark.fake", "false") == "true") {
|
||||
logInfo("Returning fake RDD at " + validTime)
|
||||
return Some(rdd)
|
||||
}
|
||||
references.get(validTime) match {
|
||||
case Some(reference) =>
|
||||
if (reference.startsWith("file") || reference.startsWith("hdfs")) {
|
||||
logInfo("Reading from file " + reference + " for time " + validTime)
|
||||
Some(ssc.sc.textFile(reference).asInstanceOf[RDD[T]])
|
||||
} else {
|
||||
logInfo("Getting from BlockManager " + reference + " for time " + validTime)
|
||||
Some(new BlockRDD(ssc.sc, Array(reference)))
|
||||
}
|
||||
case None =>
|
||||
throw new Exception(this.toString + ": Reference missing for time " + validTime + "!!!")
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
def setReference(time: Time, reference: AnyRef) {
|
||||
references += ((time, reference.toString))
|
||||
logInfo("Reference added for time " + time + " - " + reference.toString)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class TestInputDStream(
|
||||
val testInputName: String,
|
||||
batchDuration: Time,
|
||||
ssc: SparkStreamContext)
|
||||
extends InputDStream[String](testInputName, batchDuration, ssc) {
|
||||
|
||||
@transient val references = new HashMap[Time,Array[String]]
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[String]] = {
|
||||
references.get(validTime) match {
|
||||
case Some(reference) =>
|
||||
Some(new BlockRDD[String](ssc.sc, reference))
|
||||
case None =>
|
||||
throw new Exception(this.toString + ": Reference missing for time " + validTime + "!!!")
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
def setReference(time: Time, reference: AnyRef) {
|
||||
references += ((time, reference.asInstanceOf[Array[String]]))
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
|
|
@ -5,9 +5,9 @@ import spark.SparkEnv
|
|||
import java.util.concurrent.Executors
|
||||
|
||||
|
||||
class JobManager(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging {
|
||||
class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
|
||||
|
||||
class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable {
|
||||
class JobHandler(ssc: StreamingContext, job: Job) extends Runnable {
|
||||
def run() {
|
||||
SparkEnv.set(ssc.env)
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
package spark.streaming
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.ask
|
||||
import akka.util.duration._
|
||||
import akka.dispatch._
|
||||
|
||||
import spark.RDD
|
||||
import spark.BlockRDD
|
||||
import spark.Logging
|
||||
|
||||
import java.io.InputStream
|
||||
|
||||
|
||||
class NetworkInputDStream[T: ClassManifest](
|
||||
@transient ssc: StreamingContext,
|
||||
val host: String,
|
||||
val port: Int,
|
||||
val bytesToObjects: InputStream => Iterator[T]
|
||||
) extends InputDStream[T](ssc) with Logging {
|
||||
|
||||
val id = ssc.getNewNetworkStreamId()
|
||||
|
||||
def start() { }
|
||||
|
||||
def stop() { }
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[T]] = {
|
||||
val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
|
||||
return Some(new BlockRDD[T](ssc.sc, blockIds))
|
||||
}
|
||||
|
||||
def createReceiver(): NetworkInputReceiver[T] = {
|
||||
new NetworkInputReceiver(id, host, port, bytesToObjects)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,248 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.Logging
|
||||
import spark.storage.BlockManager
|
||||
import spark.storage.StorageLevel
|
||||
import spark.SparkEnv
|
||||
import spark.streaming.util.SystemClock
|
||||
import spark.streaming.util.RecurringTimer
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.mutable.Queue
|
||||
import scala.collection.mutable.SynchronizedPriorityQueue
|
||||
import scala.math.Ordering
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.net.Socket
|
||||
import java.io.InputStream
|
||||
import java.io.BufferedInputStream
|
||||
import java.io.DataInputStream
|
||||
import java.io.EOFException
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.ask
|
||||
import akka.util.duration._
|
||||
import akka.dispatch._
|
||||
|
||||
trait NetworkInputReceiverMessage
|
||||
case class GetBlockIds(time: Long) extends NetworkInputReceiverMessage
|
||||
case class GotBlockIds(streamId: Int, blocksIds: Array[String]) extends NetworkInputReceiverMessage
|
||||
case class StopReceiver() extends NetworkInputReceiverMessage
|
||||
|
||||
class NetworkInputReceiver[T: ClassManifest](streamId: Int, host: String, port: Int, bytesToObjects: InputStream => Iterator[T])
|
||||
extends Logging {
|
||||
|
||||
class ReceiverActor extends Actor {
|
||||
override def preStart() = {
|
||||
logInfo("Attempting to register")
|
||||
val ip = System.getProperty("spark.master.host", "localhost")
|
||||
val port = System.getProperty("spark.master.port", "7077").toInt
|
||||
val actorName: String = "NetworkInputTracker"
|
||||
val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
|
||||
val trackerActor = env.actorSystem.actorFor(url)
|
||||
val timeout = 100.milliseconds
|
||||
val future = trackerActor.ask(RegisterReceiver(streamId, self))(timeout)
|
||||
Await.result(future, timeout)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case GetBlockIds(time) => {
|
||||
logInfo("Got request for block ids for " + time)
|
||||
sender ! GotBlockIds(streamId, dataHandler.getPushedBlocks())
|
||||
}
|
||||
|
||||
case StopReceiver() => {
|
||||
if (receivingThread != null) {
|
||||
receivingThread.interrupt()
|
||||
}
|
||||
sender ! true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DataHandler {
|
||||
|
||||
class Block(val time: Long, val iterator: Iterator[T]) {
|
||||
val blockId = "input-" + streamId + "-" + time
|
||||
var pushed = true
|
||||
override def toString() = "input block " + blockId
|
||||
}
|
||||
|
||||
val clock = new SystemClock()
|
||||
val blockInterval = 200L
|
||||
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
|
||||
val blockOrdering = new Ordering[Block] {
|
||||
def compare(b1: Block, b2: Block) = (b1.time - b2.time).toInt
|
||||
}
|
||||
val blockStorageLevel = StorageLevel.DISK_AND_MEMORY
|
||||
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
|
||||
val blocksForReporting = new SynchronizedPriorityQueue[Block]()(blockOrdering)
|
||||
val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
|
||||
|
||||
var currentBuffer = new ArrayBuffer[T]
|
||||
|
||||
def start() {
|
||||
blockIntervalTimer.start()
|
||||
blockPushingThread.start()
|
||||
logInfo("Data handler started")
|
||||
}
|
||||
|
||||
def stop() {
|
||||
blockIntervalTimer.stop()
|
||||
blockPushingThread.interrupt()
|
||||
}
|
||||
|
||||
def += (obj: T) {
|
||||
currentBuffer += obj
|
||||
}
|
||||
|
||||
def updateCurrentBuffer(time: Long) {
|
||||
val newBlockBuffer = currentBuffer
|
||||
currentBuffer = new ArrayBuffer[T]
|
||||
if (newBlockBuffer.size > 0) {
|
||||
val newBlock = new Block(time - blockInterval, newBlockBuffer.toIterator)
|
||||
blocksForPushing.add(newBlock)
|
||||
blocksForReporting.enqueue(newBlock)
|
||||
}
|
||||
}
|
||||
|
||||
def keepPushingBlocks() {
|
||||
logInfo("Block pushing thread started")
|
||||
try {
|
||||
while(true) {
|
||||
val block = blocksForPushing.take()
|
||||
if (blockManager != null) {
|
||||
blockManager.put(block.blockId, block.iterator, blockStorageLevel)
|
||||
block.pushed = true
|
||||
} else {
|
||||
logWarning(block + " not put as block manager is null")
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case ie: InterruptedException => println("Block pushing thread interrupted")
|
||||
case e: Exception => e.printStackTrace()
|
||||
}
|
||||
}
|
||||
|
||||
def getPushedBlocks(): Array[String] = {
|
||||
val pushedBlocks = new ArrayBuffer[String]()
|
||||
var loop = true
|
||||
while(loop && !blocksForReporting.isEmpty) {
|
||||
val block = blocksForReporting.dequeue()
|
||||
if (block == null) {
|
||||
loop = false
|
||||
} else if (!block.pushed) {
|
||||
blocksForReporting.enqueue(block)
|
||||
} else {
|
||||
pushedBlocks += block.blockId
|
||||
}
|
||||
}
|
||||
logInfo("Got " + pushedBlocks.size + " blocks")
|
||||
pushedBlocks.toArray
|
||||
}
|
||||
}
|
||||
|
||||
val blockManager = if (SparkEnv.get != null) SparkEnv.get.blockManager else null
|
||||
val dataHandler = new DataHandler()
|
||||
val env = SparkEnv.get
|
||||
|
||||
var receiverActor: ActorRef = null
|
||||
var receivingThread: Thread = null
|
||||
|
||||
def run() {
|
||||
initLogging()
|
||||
var socket: Socket = null
|
||||
try {
|
||||
if (SparkEnv.get != null) {
|
||||
receiverActor = SparkEnv.get.actorSystem.actorOf(Props(new ReceiverActor), "ReceiverActor-" + streamId)
|
||||
}
|
||||
dataHandler.start()
|
||||
socket = connect()
|
||||
receivingThread = Thread.currentThread()
|
||||
receive(socket)
|
||||
} catch {
|
||||
case ie: InterruptedException => logInfo("Receiver interrupted")
|
||||
} finally {
|
||||
receivingThread = null
|
||||
if (socket != null) socket.close()
|
||||
dataHandler.stop()
|
||||
}
|
||||
}
|
||||
|
||||
def connect(): Socket = {
|
||||
logInfo("Connecting to " + host + ":" + port)
|
||||
val socket = new Socket(host, port)
|
||||
logInfo("Connected to " + host + ":" + port)
|
||||
socket
|
||||
}
|
||||
|
||||
def receive(socket: Socket) {
|
||||
val iterator = bytesToObjects(socket.getInputStream())
|
||||
while(iterator.hasNext) {
|
||||
val obj = iterator.next
|
||||
dataHandler += obj
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
object NetworkInputReceiver {
|
||||
|
||||
def bytesToLines(inputStream: InputStream): Iterator[String] = {
|
||||
val bufferedInputStream = new BufferedInputStream(inputStream)
|
||||
val dataInputStream = new DataInputStream(bufferedInputStream)
|
||||
|
||||
val iterator = new Iterator[String] {
|
||||
var gotNext = false
|
||||
var finished = false
|
||||
var nextValue: String = null
|
||||
|
||||
private def getNext() {
|
||||
try {
|
||||
nextValue = dataInputStream.readLine()
|
||||
println("[" + nextValue + "]")
|
||||
} catch {
|
||||
case eof: EOFException =>
|
||||
finished = true
|
||||
}
|
||||
gotNext = true
|
||||
}
|
||||
|
||||
override def hasNext: Boolean = {
|
||||
if (!gotNext) {
|
||||
getNext()
|
||||
}
|
||||
if (finished) {
|
||||
dataInputStream.close()
|
||||
}
|
||||
!finished
|
||||
}
|
||||
|
||||
|
||||
override def next(): String = {
|
||||
if (!gotNext) {
|
||||
getNext()
|
||||
}
|
||||
if (finished) {
|
||||
throw new NoSuchElementException("End of stream")
|
||||
}
|
||||
gotNext = false
|
||||
nextValue
|
||||
}
|
||||
}
|
||||
iterator
|
||||
}
|
||||
|
||||
def main(args: Array[String]) {
|
||||
if (args.length < 2) {
|
||||
println("NetworkReceiver <hostname> <port>")
|
||||
System.exit(1)
|
||||
}
|
||||
val host = args(0)
|
||||
val port = args(1).toInt
|
||||
val receiver = new NetworkInputReceiver(0, host, port, bytesToLines)
|
||||
receiver.run()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,110 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.Logging
|
||||
import spark.SparkEnv
|
||||
|
||||
import scala.collection.mutable.HashMap
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.ask
|
||||
import akka.util.duration._
|
||||
import akka.dispatch._
|
||||
|
||||
trait NetworkInputTrackerMessage
|
||||
case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
|
||||
|
||||
class NetworkInputTracker(
|
||||
@transient ssc: StreamingContext,
|
||||
@transient networkInputStreams: Array[NetworkInputDStream[_]])
|
||||
extends Logging {
|
||||
|
||||
class TrackerActor extends Actor {
|
||||
def receive = {
|
||||
case RegisterReceiver(streamId, receiverActor) => {
|
||||
if (!networkInputStreamIds.contains(streamId)) {
|
||||
throw new Exception("Register received for unexpected id " + streamId)
|
||||
}
|
||||
receiverInfo += ((streamId, receiverActor))
|
||||
logInfo("Registered receiver for network stream " + streamId)
|
||||
sender ! true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ReceiverExecutor extends Thread {
|
||||
val env = ssc.env
|
||||
|
||||
override def run() {
|
||||
try {
|
||||
SparkEnv.set(env)
|
||||
startReceivers()
|
||||
} catch {
|
||||
case ie: InterruptedException => logInfo("ReceiverExecutor interrupted")
|
||||
} finally {
|
||||
stopReceivers()
|
||||
}
|
||||
}
|
||||
|
||||
def startReceivers() {
|
||||
val tempRDD = ssc.sc.makeRDD(networkInputStreams, networkInputStreams.size)
|
||||
|
||||
val startReceiver = (iterator: Iterator[NetworkInputDStream[_]]) => {
|
||||
if (!iterator.hasNext) {
|
||||
throw new Exception("Could not start receiver as details not found.")
|
||||
}
|
||||
val stream = iterator.next
|
||||
val receiver = stream.createReceiver()
|
||||
receiver.run()
|
||||
}
|
||||
|
||||
ssc.sc.runJob(tempRDD, startReceiver)
|
||||
}
|
||||
|
||||
def stopReceivers() {
|
||||
implicit val ec = env.actorSystem.dispatcher
|
||||
val message = new StopReceiver()
|
||||
val listOfFutures = receiverInfo.values.map(_.ask(message)(timeout)).toList
|
||||
val futureOfList = Future.sequence(listOfFutures)
|
||||
Await.result(futureOfList, timeout)
|
||||
}
|
||||
}
|
||||
|
||||
val networkInputStreamIds = networkInputStreams.map(_.id).toArray
|
||||
val receiverExecutor = new ReceiverExecutor()
|
||||
val receiverInfo = new HashMap[Int, ActorRef]
|
||||
val receivedBlockIds = new HashMap[Int, Array[String]]
|
||||
val timeout = 1000.milliseconds
|
||||
|
||||
|
||||
var currentTime: Time = null
|
||||
|
||||
def start() {
|
||||
ssc.env.actorSystem.actorOf(Props(new TrackerActor), "NetworkInputTracker")
|
||||
receiverExecutor.start()
|
||||
}
|
||||
|
||||
def stop() {
|
||||
// stop the actor
|
||||
receiverExecutor.interrupt()
|
||||
}
|
||||
|
||||
def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized {
|
||||
if (currentTime == null || time > currentTime) {
|
||||
logInfo("Getting block ids from receivers for " + time)
|
||||
implicit val ec = ssc.env.actorSystem.dispatcher
|
||||
receivedBlockIds.clear()
|
||||
val message = new GetBlockIds(time)
|
||||
val listOfFutures = receiverInfo.values.map(
|
||||
_.ask(message)(timeout).mapTo[GotBlockIds]
|
||||
).toList
|
||||
val futureOfList = Future.sequence(listOfFutures)
|
||||
val allBlockIds = Await.result(futureOfList, timeout)
|
||||
receivedBlockIds ++= allBlockIds.map(x => (x.streamId, x.blocksIds))
|
||||
if (receivedBlockIds.size != receiverInfo.size) {
|
||||
throw new Exception("Unexpected number of the Block IDs received")
|
||||
}
|
||||
currentTime = time
|
||||
}
|
||||
receivedBlockIds.getOrElse(receiverId, Array[String]())
|
||||
}
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
package spark.streaming
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import spark.streaming.SparkStreamContext._
|
||||
import spark.streaming.StreamingContext._
|
||||
|
||||
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)])
|
||||
extends Serializable {
|
||||
|
|
|
@ -7,7 +7,7 @@ import scala.collection.mutable.Queue
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
class QueueInputDStream[T: ClassManifest](
|
||||
ssc: SparkStreamContext,
|
||||
ssc: StreamingContext,
|
||||
val queue: Queue[RDD[T]],
|
||||
oneAtATime: Boolean,
|
||||
defaultRDD: RDD[T]
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.streaming.SparkStreamContext._
|
||||
import spark.streaming.StreamingContext._
|
||||
|
||||
import spark.RDD
|
||||
import spark.UnionRDD
|
||||
|
|
|
@ -12,7 +12,7 @@ sealed trait SchedulerMessage
|
|||
case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage
|
||||
|
||||
class Scheduler(
|
||||
ssc: SparkStreamContext,
|
||||
ssc: StreamingContext,
|
||||
inputStreams: Array[InputDStream[_]],
|
||||
outputStreams: Array[DStream[_]])
|
||||
extends Logging {
|
||||
|
@ -40,6 +40,7 @@ extends Logging {
|
|||
}
|
||||
|
||||
def generateRDDs (time: Time) {
|
||||
println("\n-----------------------------------------------------\n")
|
||||
logInfo("Generating RDDs for time " + time)
|
||||
outputStreams.foreach(outputStream => {
|
||||
outputStream.generateJob(time) match {
|
||||
|
|
|
@ -8,6 +8,7 @@ import spark.SparkContext
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.mutable.Queue
|
||||
|
||||
import java.io.InputStream
|
||||
import java.io.IOException
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
@ -19,7 +20,7 @@ import org.apache.hadoop.io.Text
|
|||
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
|
||||
|
||||
class SparkStreamContext (
|
||||
class StreamingContext (
|
||||
master: String,
|
||||
frameworkName: String,
|
||||
val sparkHome: String = null,
|
||||
|
@ -33,8 +34,12 @@ class SparkStreamContext (
|
|||
|
||||
val inputStreams = new ArrayBuffer[InputDStream[_]]()
|
||||
val outputStreams = new ArrayBuffer[DStream[_]]()
|
||||
val nextNetworkInputStreamId = new AtomicInteger(0)
|
||||
|
||||
var batchDuration: Time = null
|
||||
var scheduler: Scheduler = null
|
||||
var networkInputTracker: NetworkInputTracker = null
|
||||
var receiverJobThread: Thread = null
|
||||
|
||||
def setBatchDuration(duration: Long) {
|
||||
setBatchDuration(Time(duration))
|
||||
|
@ -43,35 +48,32 @@ class SparkStreamContext (
|
|||
def setBatchDuration(duration: Time) {
|
||||
batchDuration = duration
|
||||
}
|
||||
|
||||
/*
|
||||
def createNetworkStream[T: ClassManifest](
|
||||
name: String,
|
||||
addresses: Array[InetSocketAddress],
|
||||
batchDuration: Time): DStream[T] = {
|
||||
|
||||
val inputStream = new NetworkinputStream[T](this, addresses)
|
||||
inputStreams += inputStream
|
||||
inputStream
|
||||
}
|
||||
|
||||
private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
|
||||
|
||||
def createNetworkTextStream(hostname: String, port: Int): DStream[String] = {
|
||||
createNetworkStream[String](hostname, port, NetworkInputReceiver.bytesToLines)
|
||||
}
|
||||
|
||||
def createNetworkStream[T: ClassManifest](
|
||||
name: String,
|
||||
addresses: Array[String],
|
||||
batchDuration: Long): DStream[T] = {
|
||||
|
||||
def stringToInetSocketAddress (str: String): InetSocketAddress = {
|
||||
val parts = str.split(":")
|
||||
if (parts.length != 2) {
|
||||
throw new IllegalArgumentException ("Address format error")
|
||||
}
|
||||
new InetSocketAddress(parts(0), parts(1).toInt)
|
||||
}
|
||||
|
||||
readNetworkStream(
|
||||
name,
|
||||
addresses.map(stringToInetSocketAddress).toArray,
|
||||
LongTime(batchDuration))
|
||||
hostname: String,
|
||||
port: Int,
|
||||
converter: (InputStream) => Iterator[T]
|
||||
): DStream[T] = {
|
||||
val inputStream = new NetworkInputDStream[T](this, hostname, port, converter)
|
||||
inputStreams += inputStream
|
||||
inputStream
|
||||
}
|
||||
|
||||
/*
|
||||
def createHttpTextStream(url: String): DStream[String] = {
|
||||
createHttpStream(url, NetworkInputReceiver.bytesToLines)
|
||||
}
|
||||
|
||||
def createHttpStream[T: ClassManifest](
|
||||
url: String,
|
||||
converter: (InputStream) => Iterator[T]
|
||||
): DStream[T] = {
|
||||
}
|
||||
*/
|
||||
|
||||
|
@ -126,7 +128,7 @@ class SparkStreamContext (
|
|||
/**
|
||||
* This function verify whether the stream computation is eligible to be executed.
|
||||
*/
|
||||
def verify() {
|
||||
private def verify() {
|
||||
if (batchDuration == null) {
|
||||
throw new Exception("Batch duration has not been set")
|
||||
}
|
||||
|
@ -147,8 +149,21 @@ class SparkStreamContext (
|
|||
*/
|
||||
def start() {
|
||||
verify()
|
||||
val networkInputStreams = inputStreams.filter(s => s match {
|
||||
case n: NetworkInputDStream[_] => true
|
||||
case _ => false
|
||||
}).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
|
||||
|
||||
if (networkInputStreams.length > 0) {
|
||||
// Start the network input tracker (must start before receivers)
|
||||
networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
|
||||
networkInputTracker.start()
|
||||
}
|
||||
|
||||
Thread.sleep(1000)
|
||||
// Start the scheduler
|
||||
scheduler = new Scheduler(this, inputStreams.toArray, outputStreams.toArray)
|
||||
scheduler.start()
|
||||
scheduler.start()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -156,18 +171,22 @@ class SparkStreamContext (
|
|||
*/
|
||||
def stop() {
|
||||
try {
|
||||
scheduler.stop()
|
||||
if (scheduler != null) scheduler.stop()
|
||||
if (networkInputTracker != null) networkInputTracker.stop()
|
||||
if (receiverJobThread != null) receiverJobThread.interrupt()
|
||||
sc.stop()
|
||||
} catch {
|
||||
case e: Exception => logWarning("Error while stopping", e)
|
||||
}
|
||||
|
||||
logInfo("SparkStreamContext stopped")
|
||||
logInfo("StreamingContext stopped")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
object SparkStreamContext {
|
||||
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) =
|
||||
new PairDStreamFunctions(stream)
|
||||
object StreamingContext {
|
||||
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
|
||||
new PairDStreamFunctions[K, V](stream)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming
|
||||
|
||||
class Time(private var millis: Long) {
|
||||
class Time (private var millis: Long) extends Serializable {
|
||||
|
||||
def copy() = new Time(this.millis)
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.streaming.SparkStreamContext._
|
||||
import spark.streaming.StreamingContext._
|
||||
|
||||
import spark.RDD
|
||||
import spark.UnionRDD
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package spark.streaming.examples
|
||||
|
||||
import spark.RDD
|
||||
import spark.streaming.SparkStreamContext
|
||||
import spark.streaming.SparkStreamContext._
|
||||
import spark.streaming.StreamingContext
|
||||
import spark.streaming.StreamingContext._
|
||||
import spark.streaming.Seconds
|
||||
|
||||
import scala.collection.mutable.SynchronizedQueue
|
||||
|
@ -16,7 +16,7 @@ object ExampleOne {
|
|||
}
|
||||
|
||||
// Create the context and set the batch size
|
||||
val ssc = new SparkStreamContext(args(0), "ExampleOne")
|
||||
val ssc = new StreamingContext(args(0), "ExampleOne")
|
||||
ssc.setBatchDuration(Seconds(1))
|
||||
|
||||
// Create the queue through which RDDs can be pushed to
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package spark.streaming.examples
|
||||
|
||||
import spark.streaming.SparkStreamContext
|
||||
import spark.streaming.SparkStreamContext._
|
||||
import spark.streaming.StreamingContext
|
||||
import spark.streaming.StreamingContext._
|
||||
import spark.streaming.Seconds
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
|
@ -15,7 +15,7 @@ object ExampleTwo {
|
|||
}
|
||||
|
||||
// Create the context and set the batch size
|
||||
val ssc = new SparkStreamContext(args(0), "ExampleTwo")
|
||||
val ssc = new StreamingContext(args(0), "ExampleTwo")
|
||||
ssc.setBatchDuration(Seconds(2))
|
||||
|
||||
// Create the new directory
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
package spark.streaming.examples
|
||||
|
||||
import spark.streaming.{Seconds, SparkStreamContext}
|
||||
import spark.streaming.SparkStreamContext._
|
||||
import spark.streaming.{Seconds, StreamingContext}
|
||||
import spark.streaming.StreamingContext._
|
||||
|
||||
object WordCount {
|
||||
object WordCountHdfs {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: WordCount <master> <directory>")
|
||||
System.err.println("Usage: WordCountHdfs <master> <directory>")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
// Create the context and set the batch size
|
||||
val ssc = new SparkStreamContext(args(0), "ExampleTwo")
|
||||
val ssc = new StreamingContext(args(0), "WordCountHdfs")
|
||||
ssc.setBatchDuration(Seconds(2))
|
||||
|
||||
// Create the FileInputDStream on the directory and use the
|
||||
|
@ -23,3 +23,4 @@ object WordCount {
|
|||
ssc.start()
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
package spark.streaming.examples
|
||||
|
||||
import spark.streaming.{Seconds, StreamingContext}
|
||||
import spark.streaming.StreamingContext._
|
||||
|
||||
object WordCountNetwork {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: WordCountNetwork <master> <hostname> <port>")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
// Create the context and set the batch size
|
||||
val ssc = new StreamingContext(args(0), "WordCountNetwork")
|
||||
ssc.setBatchDuration(Seconds(2))
|
||||
|
||||
// Create the FileInputDStream on the directory and use the
|
||||
// stream to count words in new files created
|
||||
val lines = ssc.createNetworkTextStream(args(1), args(2).toInt)
|
||||
val words = lines.flatMap(_.split(" "))
|
||||
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
|
||||
wordCounts.print()
|
||||
ssc.start()
|
||||
}
|
||||
}
|
|
@ -12,7 +12,7 @@ import scala.collection.mutable.SynchronizedQueue
|
|||
|
||||
class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
|
||||
|
||||
var ssc: SparkStreamContext = null
|
||||
var ssc: StreamingContext = null
|
||||
val batchDurationMillis = 1000
|
||||
|
||||
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
|
||||
|
@ -22,7 +22,7 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
|
|||
operation: DStream[U] => DStream[V],
|
||||
expectedOutput: Seq[Seq[V]]) {
|
||||
try {
|
||||
ssc = new SparkStreamContext("local", "test")
|
||||
ssc = new StreamingContext("local", "test")
|
||||
ssc.setBatchDuration(Milliseconds(batchDurationMillis))
|
||||
|
||||
val inputStream = ssc.createQueueStream(input.map(ssc.sc.makeRDD(_, 2)).toIterator)
|
||||
|
|
Loading…
Reference in a new issue