Merge branch 'apache-master' into project-refactor

This commit is contained in:
Tathagata Das 2013-12-31 00:43:38 -08:00
commit 977bcc36d4
14 changed files with 52 additions and 67 deletions

View file

@ -53,5 +53,3 @@ private[spark] case class ExceptionFailure(
private[spark] case object TaskResultLost extends TaskEndReason private[spark] case object TaskResultLost extends TaskEndReason
private[spark] case object TaskKilled extends TaskEndReason private[spark] case object TaskKilled extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason

View file

@ -140,12 +140,12 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
<body> <body>
{linkToMaster} {linkToMaster}
<div> <div>
<div style="float:left;width:40%">{backButton}</div> <div style="float:left; margin-right:10px">{backButton}</div>
<div style="float:left;">{range}</div> <div style="float:left;">{range}</div>
<div style="float:right;">{nextButton}</div> <div style="float:right; margin-left:10px">{nextButton}</div>
</div> </div>
<br /> <br />
<div style="height:500px;overflow:auto;padding:5px;"> <div style="height:500px; overflow:auto; padding:5px;">
<pre>{logText}</pre> <pre>{logText}</pre>
</div> </div>
</body> </body>

View file

@ -152,7 +152,8 @@ class DAGScheduler(
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
val running = new HashSet[Stage] // Stages we are running right now val running = new HashSet[Stage] // Stages we are running right now
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage // Missing tasks from each stage
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits
val activeJobs = new HashSet[ActiveJob] val activeJobs = new HashSet[ActiveJob]
@ -239,7 +240,8 @@ class DAGScheduler(
shuffleToMapStage.get(shuffleDep.shuffleId) match { shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage case Some(stage) => stage
case None => case None =>
val stage = newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId) val stage =
newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage shuffleToMapStage(shuffleDep.shuffleId) = stage
stage stage
} }
@ -248,7 +250,8 @@ class DAGScheduler(
/** /**
* Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
* of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided * of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided
* jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage directly. * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
* directly.
*/ */
private def newStage( private def newStage(
rdd: RDD[_], rdd: RDD[_],
@ -358,7 +361,8 @@ class DAGScheduler(
stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
val parents = getParentStages(s.rdd, jobId) val parents = getParentStages(s.rdd, jobId)
val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) val parentsWithoutThisJobId = parents.filter(p =>
!stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail) updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
} }
} }
@ -366,8 +370,9 @@ class DAGScheduler(
} }
/** /**
* Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that * Removes job and any stages that are not needed by any other job. Returns the set of ids for
* were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation. * stages that were removed. The associated tasks for those stages need to be cancelled if we
* got here via job cancellation.
*/ */
private def removeJobAndIndependentStages(jobId: Int): Set[Int] = { private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
val registeredStages = jobIdToStageIds(jobId) val registeredStages = jobIdToStageIds(jobId)
@ -378,7 +383,8 @@ class DAGScheduler(
stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach { stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach {
case (stageId, jobSet) => case (stageId, jobSet) =>
if (!jobSet.contains(jobId)) { if (!jobSet.contains(jobId)) {
logError("Job %d not registered for stage %d even though that stage was registered for the job" logError(
"Job %d not registered for stage %d even though that stage was registered for the job"
.format(jobId, stageId)) .format(jobId, stageId))
} else { } else {
def removeStage(stageId: Int) { def removeStage(stageId: Int) {
@ -389,7 +395,8 @@ class DAGScheduler(
running -= s running -= s
} }
stageToInfos -= s stageToInfos -= s
shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleToMapStage.remove) shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
shuffleToMapStage.remove(shuffleId))
if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) { if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
logDebug("Removing pending status for stage %d".format(stageId)) logDebug("Removing pending status for stage %d".format(stageId))
} }
@ -407,7 +414,8 @@ class DAGScheduler(
stageIdToStage -= stageId stageIdToStage -= stageId
stageIdToJobIds -= stageId stageIdToJobIds -= stageId
logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size)) logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
} }
jobSet -= jobId jobSet -= jobId
@ -459,7 +467,8 @@ class DAGScheduler(
assert(partitions.size > 0) assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter waiter
} }
@ -494,7 +503,8 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray val partitions = (0 until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement() val jobId = nextJobId.getAndIncrement()
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties) eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
listener.awaitResult() // Will throw an exception if the job fails listener.awaitResult() // Will throw an exception if the job fails
} }
@ -529,8 +539,8 @@ class DAGScheduler(
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
var finalStage: Stage = null var finalStage: Stage = null
try { try {
// New stage creation at times and if its not protected, the scheduler thread is killed. // New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD
// e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted // whose underlying HDFS files have been deleted.
finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
} catch { } catch {
case e: Exception => case e: Exception =>
@ -563,7 +573,8 @@ class DAGScheduler(
case JobGroupCancelled(groupId) => case JobGroupCancelled(groupId) =>
// Cancel all jobs belonging to this job group. // Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages for them. // First finds all active jobs with this group id, and then kill stages for them.
val activeInGroup = activeJobs.filter(groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId) val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach { handleJobCancellation } jobIds.foreach { handleJobCancellation }
@ -585,7 +596,8 @@ class DAGScheduler(
stage <- stageIdToStage.get(task.stageId); stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage) stageInfo <- stageToInfos.get(stage)
) { ) {
if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && !stageInfo.emittedTaskSizeWarning) { if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
!stageInfo.emittedTaskSizeWarning) {
stageInfo.emittedTaskSizeWarning = true stageInfo.emittedTaskSizeWarning = true
logWarning(("Stage %d (%s) contains a task of very large " + logWarning(("Stage %d (%s) contains a task of very large " +
"size (%d KB). The maximum recommended task size is %d KB.").format( "size (%d KB). The maximum recommended task size is %d KB.").format(
@ -815,7 +827,7 @@ class DAGScheduler(
} }
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
listenerBus.post(StageCompleted(stageToInfos(stage))) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
running -= stage running -= stage
} }
event.reason match { event.reason match {

View file

@ -297,7 +297,7 @@ class JobLogger(val user: String, val logDirName: String)
* When stage is completed, record stage completion status * When stage is completed, record stage completion status
* @param stageCompleted Stage completed event * @param stageCompleted Stage completed event
*/ */
override def onStageCompleted(stageCompleted: StageCompleted) { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format( stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
stageCompleted.stage.stageId)) stageCompleted.stage.stageId))
} }
@ -328,10 +328,6 @@ class JobLogger(val user: String, val logDirName: String)
task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
mapId + " REDUCE_ID=" + reduceId mapId + " REDUCE_ID=" + reduceId
stageLogInfo(task.stageId, taskStatus) stageLogInfo(task.stageId, taskStatus)
case OtherFailure(message) =>
taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId +
" STAGE_ID=" + task.stageId + " INFO=" + message
stageLogInfo(task.stageId, taskStatus)
case _ => case _ =>
} }
} }

View file

@ -117,8 +117,4 @@ private[spark] class Pool(
parent.decreaseRunningTasks(taskNum) parent.decreaseRunningTasks(taskNum)
} }
} }
override def hasPendingTasks(): Boolean = {
schedulableQueue.exists(_.hasPendingTasks())
}
} }

View file

@ -42,5 +42,4 @@ private[spark] trait Schedulable {
def executorLost(executorId: String, host: String): Unit def executorLost(executorId: String, host: String): Unit
def checkSpeculatableTasks(): Boolean def checkSpeculatableTasks(): Boolean
def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
def hasPendingTasks(): Boolean
} }

View file

@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties) case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
extends SparkListenerEvents extends SparkListenerEvents
case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
@ -47,7 +47,7 @@ trait SparkListener {
/** /**
* Called when a stage is completed, with information on the completed stage * Called when a stage is completed, with information on the completed stage
*/ */
def onStageCompleted(stageCompleted: StageCompleted) { } def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
/** /**
* Called when a stage is submitted * Called when a stage is submitted
@ -86,7 +86,7 @@ trait SparkListener {
* Simple SparkListener that logs a few summary statistics when each stage completes * Simple SparkListener that logs a few summary statistics when each stage completes
*/ */
class StatsReportListener extends SparkListener with Logging { class StatsReportListener extends SparkListener with Logging {
override def onStageCompleted(stageCompleted: StageCompleted) { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
import org.apache.spark.scheduler.StatsReportListener._ import org.apache.spark.scheduler.StatsReportListener._
implicit val sc = stageCompleted implicit val sc = stageCompleted
this.logInfo("Finished stage: " + stageCompleted.stage) this.logInfo("Finished stage: " + stageCompleted.stage)
@ -119,13 +119,17 @@ object StatsReportListener extends Logging {
val probabilities = percentiles.map{_ / 100.0} val probabilities = percentiles.map{_ / 100.0}
val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%" val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = { def extractDoubleDistribution(stage: SparkListenerStageCompleted,
getMetric: (TaskInfo,TaskMetrics) => Option[Double])
: Option[Distribution] = {
Distribution(stage.stage.taskInfos.flatMap { Distribution(stage.stage.taskInfos.flatMap {
case ((info,metric)) => getMetric(info, metric)}) case ((info,metric)) => getMetric(info, metric)})
} }
//is there some way to setup the types that I can get rid of this completely? //is there some way to setup the types that I can get rid of this completely?
def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = { def extractLongDistribution(stage: SparkListenerStageCompleted,
getMetric: (TaskInfo,TaskMetrics) => Option[Long])
: Option[Distribution] = {
extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble}) extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
} }
@ -147,12 +151,12 @@ object StatsReportListener extends Logging {
} }
def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double]) def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
(implicit stage: StageCompleted) { (implicit stage: SparkListenerStageCompleted) {
showDistribution(heading, extractDoubleDistribution(stage, getMetric), format) showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
} }
def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long]) def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
(implicit stage: StageCompleted) { (implicit stage: SparkListenerStageCompleted) {
showBytesDistribution(heading, extractLongDistribution(stage, getMetric)) showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
} }
@ -169,7 +173,7 @@ object StatsReportListener extends Logging {
} }
def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long]) def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
(implicit stage: StageCompleted) { (implicit stage: SparkListenerStageCompleted) {
showMillisDistribution(heading, extractLongDistribution(stage, getMetric)) showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
} }

View file

@ -41,7 +41,7 @@ private[spark] class SparkListenerBus() extends Logging {
event match { event match {
case stageSubmitted: SparkListenerStageSubmitted => case stageSubmitted: SparkListenerStageSubmitted =>
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted)) sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
case stageCompleted: StageCompleted => case stageCompleted: SparkListenerStageCompleted =>
sparkListeners.foreach(_.onStageCompleted(stageCompleted)) sparkListeners.foreach(_.onStageCompleted(stageCompleted))
case jobStart: SparkListenerJobStart => case jobStart: SparkListenerJobStart =>
sparkListeners.foreach(_.onJobStart(jobStart)) sparkListeners.foreach(_.onJobStart(jobStart))

View file

@ -365,13 +365,6 @@ private[spark] class TaskSchedulerImpl(
} }
} }
// Check for pending tasks in all our active jobs.
def hasPendingTasks: Boolean = {
synchronized {
rootPool.hasPendingTasks()
}
}
def executorLost(executorId: String, reason: ExecutorLossReason) { def executorLost(executorId: String, reason: ExecutorLossReason) {
var failedExecutor: Option[String] = None var failedExecutor: Option[String] = None

View file

@ -112,10 +112,6 @@ private[spark] class TaskSetManager(
// Task index, start and finish time for each task attempt (indexed by task ID) // Task index, start and finish time for each task attempt (indexed by task ID)
val taskInfos = new HashMap[Long, TaskInfo] val taskInfos = new HashMap[Long, TaskInfo]
// Did the TaskSet fail?
var failed = false
var causeOfFailure = ""
// How frequently to reprint duplicate exceptions in full, in milliseconds // How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL = val EXCEPTION_PRINT_INTERVAL =
System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
@ -556,8 +552,6 @@ private[spark] class TaskSetManager(
} }
def abort(message: String) { def abort(message: String) {
failed = true
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error // TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.dagScheduler.taskSetFailed(taskSet, message) sched.dagScheduler.taskSetFailed(taskSet, message)
removeAllRunningTasks() removeAllRunningTasks()
@ -681,10 +675,6 @@ private[spark] class TaskSetManager(
return foundTasks return foundTasks
} }
override def hasPendingTasks(): Boolean = {
numTasks > 0 && tasksSuccessful < numTasks
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = System.getProperty("spark.locality.wait", "3000") val defaultWait = System.getProperty("spark.locality.wait", "3000")
level match { level match {

View file

@ -61,7 +61,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onJobStart(jobStart: SparkListenerJobStart) {} override def onJobStart(jobStart: SparkListenerJobStart) {}
override def onStageCompleted(stageCompleted: StageCompleted) = synchronized { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stage val stage = stageCompleted.stage
poolToActiveStages(stageIdToPool(stage.stageId)) -= stage poolToActiveStages(stageIdToPool(stage.stageId)) -= stage
activeStages -= stage activeStages -= stage
@ -146,12 +146,9 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
// update duration // update duration
y.taskTime += taskEnd.taskInfo.duration y.taskTime += taskEnd.taskInfo.duration
taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead => Option(taskEnd.taskMetrics).foreach { taskMetrics =>
y.shuffleRead += shuffleRead.remoteBytesRead taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
} taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
y.shuffleWrite += shuffleWrite.shuffleBytesWritten
} }
} }
case _ => {} case _ => {}

View file

@ -117,7 +117,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1 override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1
override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1 override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1 override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1 override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1 override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
} }
sc.addSparkListener(joblogger) sc.addSparkListener(joblogger)

View file

@ -174,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
class SaveStageInfo extends SparkListener { class SaveStageInfo extends SparkListener {
val stageInfos = Buffer[StageInfo]() val stageInfos = Buffer[StageInfo]()
override def onStageCompleted(stage: StageCompleted) { override def onStageCompleted(stage: SparkListenerStageCompleted) {
stageInfos += stage.stage stageInfos += stage.stage
} }
} }

View file

@ -17,7 +17,7 @@ rem See the License for the specific language governing permissions and
rem limitations under the License. rem limitations under the License.
rem rem
set SCALA_VERSION=2.9.3 set SCALA_VERSION=2.10
rem Figure out where the Spark framework is installed rem Figure out where the Spark framework is installed
set FWDIR=%~dp0 set FWDIR=%~dp0