Allow SparkContext.submitJob to submit a job for only a subset of the partitions.

This commit is contained in:
Reynold Xin 2013-09-18 04:16:18 -07:00
parent 37d8f37a8e
commit bf515688e7
3 changed files with 5 additions and 4 deletions

View file

@ -816,6 +816,7 @@ class SparkContext(
def submitJob[T, U, R]( def submitJob[T, U, R](
rdd: RDD[T], rdd: RDD[T],
processPartition: Iterator[T] => U, processPartition: Iterator[T] => U,
partitions: Seq[Int],
partitionResultHandler: (Int, U) => Unit, partitionResultHandler: (Int, U) => Unit,
resultFunc: () => R): Future[R] = resultFunc: () => R): Future[R] =
{ {
@ -823,7 +824,7 @@ class SparkContext(
val waiter = dagScheduler.submitJob( val waiter = dagScheduler.submitJob(
rdd, rdd,
(context: TaskContext, iter: Iterator[T]) => processPartition(iter), (context: TaskContext, iter: Iterator[T]) => processPartition(iter),
0 until rdd.partitions.size, partitions,
callSite, callSite,
allowLocal = false, allowLocal = false,
partitionResultHandler, partitionResultHandler,

View file

@ -568,7 +568,7 @@ abstract class RDD[T: ClassManifest](
def collectAsync(): Future[Seq[T]] = { def collectAsync(): Future[Seq[T]] = {
val results = new ArrayBuffer[T] val results = new ArrayBuffer[T]
sc.submitJob[T, Array[T], Seq[T]]( sc.submitJob[T, Array[T], Seq[T]](
this, _.toArray, (index, data) => results ++= data, () => results) this, _.toArray, Range(0, partitions.size), (index, data) => results ++= data, () => results)
} }
/** /**

View file

@ -42,11 +42,11 @@ import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
* locations to run each task on, based on the current cache status, and passes these to the * locations to run each task on, based on the current cache status, and passes these to the
* low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
* lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
* not caused by shuffie file loss are handled by the TaskScheduler, which will retry each task * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
* a small number of times before cancelling the whole stage. * a small number of times before cancelling the whole stage.
* *
* THREADING: This class runs all its logic in a single thread executing the run() method, to which * THREADING: This class runs all its logic in a single thread executing the run() method, to which
* events are submitted using a synchonized queue (eventQueue). The public API methods, such as * events are submitted using a synchronized queue (eventQueue). The public API methods, such as
* runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods
* should be private. * should be private.
*/ */