Added documentation for setJobGroup. Also some minor cleanup in SparkContext.

This commit is contained in:
Reynold Xin 2013-10-19 23:16:44 -07:00
parent f628804c02
commit eb9bf69462

View file

@ -51,25 +51,20 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, ClusterScheduler}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util._
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
import scala.Some
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
TimeStampedHashMap, Utils}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@ -125,7 +120,7 @@ class SparkContext(
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup)
// Initalize the Spark UI
// Initialize the Spark UI
private[spark] val ui = new SparkUI(this)
ui.bind()
@ -292,11 +287,31 @@ class SparkContext(
setJobGroup("", value)
}
/**
* Assigns a group id to all the jobs started by this thread until the group id is set to a
* different value or cleared.
*
* Often, a unit of execution in an application consists of multiple Spark actions or jobs.
* Application programmers can use this method to group all those jobs together and give the
* group a description. Once set, the Spark web UI will associate such jobs with this group.
*
* The application can also use [[org.apache.spark.SparkContext.cancelJobGroup]] to cancel all
* running jobs in this group. For example,
* {{{
* // In the main thread:
* sc.setJobGroup("some_job_to_cancel", "some job description")
* sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
*
* // In a separate thread:
* sc.cancelJobGroup("some_job_to_cancel")
* }}}
*/
def setJobGroup(groupId: String, description: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
}
/** Clear the job group id and its description. */
def clearJobGroup() {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
@ -881,13 +896,15 @@ class SparkContext(
new SimpleFutureAction(waiter, resultFunc)
}
/**
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
* for more information.
*/
def cancelJobGroup(groupId: String) {
dagScheduler.cancelJobGroup(groupId)
}
/**
* Cancel all jobs that have been scheduled or are running.
*/
/** Cancel all jobs that have been scheduled or are running. */
def cancelAllJobs() {
dagScheduler.cancelAllJobs()
}
@ -949,9 +966,9 @@ class SparkContext(
*/
object SparkContext {
val SPARK_JOB_DESCRIPTION = "spark.job.description"
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2