Restructured file locations to separate examples and other programs from core programs.

This commit is contained in:
Tathagata Das 2012-07-30 13:29:13 -07:00
parent fcee4153b9
commit 5a26ca4a80
28 changed files with 25 additions and 157 deletions

View file

@ -42,26 +42,6 @@ case class Interval (val beginTime: Time, val endTime: Time) {
object Interval {
/*
implicit def longTupleToInterval (longTuple: (Long, Long)) =
Interval(longTuple._1, longTuple._2)
implicit def intTupleToInterval (intTuple: (Int, Int)) =
Interval(intTuple._1, intTuple._2)
implicit def string2Interval (str: String): Interval = {
val parts = str.split(",")
if (parts.length == 1)
return Interval.zero
return Interval (parts(0).toInt, parts(1).toInt)
}
def getInterval (timeMs: Long, intervalDurationMs: Long): Interval = {
val intervalBeginMs = timeMs / intervalDurationMs * intervalDurationMs
Interval(intervalBeginMs, intervalBeginMs + intervalDurationMs)
}
*/
def zero() = new Interval (Time.zero, Time.zero)
def currentInterval(intervalDuration: LongTime): Interval = {

View file

@ -1,112 +1,37 @@
package spark.streaming
import spark.SparkEnv
import spark.Logging
import spark.{Logging, SparkEnv}
import java.util.concurrent.Executors
import scala.collection.mutable.PriorityQueue
import scala.actors._
import scala.actors.Actor._
import scala.actors.remote._
import scala.actors.remote.RemoteActor._
import scala.actors.scheduler.ResizableThreadPoolScheduler
import scala.actors.scheduler.ForkJoinScheduler
sealed trait JobManagerMessage
case class RunJob(job: Job) extends JobManagerMessage
case class JobCompleted(handlerId: Int) extends JobManagerMessage
class JobHandler(ssc: SparkStreamContext, val id: Int) extends DaemonActor with Logging {
var busy = false
def act() {
loop {
receive {
case job: Job => {
SparkEnv.set(ssc.env)
try {
logInfo("Starting " + job)
job.run()
logInfo("Finished " + job)
if (job.time.isInstanceOf[LongTime]) {
val longTime = job.time.asInstanceOf[LongTime]
logInfo("Total pushing + skew + processing delay for " + longTime + " is " +
(System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
}
} catch {
case e: Exception => logError("SparkStream job failed", e)
class JobManager(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging {
class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable {
def run() {
SparkEnv.set(ssc.env)
try {
logInfo("Starting " + job)
job.run()
logInfo("Finished " + job)
if (job.time.isInstanceOf[LongTime]) {
val longTime = job.time.asInstanceOf[LongTime]
logInfo("Total notification + skew + processing delay for " + longTime + " is " +
(System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
if (System.getProperty("spark.stream.distributed", "false") == "true") {
TestInputBlockTracker.setEndTime(job.time)
}
busy = false
reply(JobCompleted(id))
}
}
}
}
}
class JobManager(ssc: SparkStreamContext, numThreads: Int = 2) extends DaemonActor with Logging {
implicit private val jobOrdering = new Ordering[Job] {
override def compare(job1: Job, job2: Job): Int = {
if (job1.time < job2.time) {
return 1
} else if (job2.time < job1.time) {
return -1
} else {
return 0
} catch {
case e: Exception => logError("SparkStream job failed", e)
}
}
}
private val jobs = new PriorityQueue[Job]()
private val handlers = (0 until numThreads).map(i => new JobHandler(ssc, i))
initLogging()
def act() {
handlers.foreach(_.start)
loop {
receive {
case RunJob(job) => {
jobs += job
logInfo("Job " + job + " submitted")
runJob()
}
case JobCompleted(handlerId) => {
runJob()
}
}
}
}
def runJob(): Unit = {
logInfo("Attempting to allocate job ")
if (jobs.size > 0) {
handlers.find(!_.busy).foreach(handler => {
val job = jobs.dequeue
logInfo("Allocating job " + job + " to handler " + handler.id)
handler.busy = true
handler ! job
})
}
val jobExecutor = Executors.newFixedThreadPool(numThreads)
def runJob(job: Job) {
jobExecutor.execute(new JobHandler(ssc, job))
}
}
object JobManager {
def main(args: Array[String]) {
val ssc = new SparkStreamContext("local[4]", "JobManagerTest")
val jobManager = new JobManager(ssc)
jobManager.start()
val t = System.currentTimeMillis
for (i <- 1 to 10) {
jobManager ! RunJob(new Job(
LongTime(i),
() => {
Thread.sleep(500)
println("Job " + i + " took " + (System.currentTimeMillis - t) + " ms")
}
))
}
Thread.sleep(6000)
}
}

View file

@ -1,37 +0,0 @@
package spark.streaming
import spark.{Logging, SparkEnv}
import java.util.concurrent.Executors
class JobManager2(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging {
class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable {
def run() {
SparkEnv.set(ssc.env)
try {
logInfo("Starting " + job)
job.run()
logInfo("Finished " + job)
if (job.time.isInstanceOf[LongTime]) {
val longTime = job.time.asInstanceOf[LongTime]
logInfo("Total notification + skew + processing delay for " + longTime + " is " +
(System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
if (System.getProperty("spark.stream.distributed", "false") == "true") {
TestInputBlockTracker.setEndTime(job.time)
}
}
} catch {
case e: Exception => logError("SparkStream job failed", e)
}
}
}
initLogging()
val jobExecutor = Executors.newFixedThreadPool(numThreads)
def runJob(job: Job) {
jobExecutor.execute(new JobHandler(ssc, job))
}
}

View file

@ -46,7 +46,7 @@ extends Actor with Logging {
val inputNames = inputRDSs.map(_.inputName).toArray
val inputStates = new HashMap[Interval, InputState]()
val currentJobs = System.getProperty("spark.stream.currentJobs", "1").toInt
val jobManager = new JobManager2(ssc, currentJobs)
val jobManager = new JobManager(ssc, currentJobs)
// TODO(Haoyuan): The following line is for performance test only.
var cnt: Int = System.getProperty("spark.stream.fake.cnt", "60").toInt