Revert "[SPARK-1021] Defer the data-driven computation of partition bounds in so..."

This reverts commit 2d972fd84a.

The commit was hanging correlationoptimizer14.
This commit is contained in:
Reynold Xin 2014-09-28 18:33:11 -07:00
parent 1f13a40ccd
commit 8e874185ed
3 changed files with 34 additions and 66 deletions

View file

@ -208,7 +208,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
processPartition: Iterator[T] => U, processPartition: Iterator[T] => U,
partitions: Seq[Int], partitions: Seq[Int],
resultHandler: (Int, U) => Unit, resultHandler: (Int, U) => Unit,
resultFunc: => R): R = { resultFunc: => R) {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob // If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block. // command need to be in an atomic block.
val job = this.synchronized { val job = this.synchronized {
@ -223,10 +223,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// cancel the job and stop the execution. This is not in a synchronized block because // cancel the job and stop the execution. This is not in a synchronized block because
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter. // Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
try { try {
Await.ready(job, Duration.Inf).value.get match { Await.ready(job, Duration.Inf)
case scala.util.Failure(e) => throw e
case scala.util.Success(v) => v
}
} catch { } catch {
case e: InterruptedException => case e: InterruptedException =>
job.cancel() job.cancel()

View file

@ -29,10 +29,6 @@ import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{CollectionsUtils, Utils} import org.apache.spark.util.{CollectionsUtils, Utils}
import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils} import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils}
import org.apache.spark.SparkContext.rddToAsyncRDDActions
import scala.concurrent.Await
import scala.concurrent.duration.Duration
/** /**
* An object that defines how the elements in a key-value pair RDD are partitioned by key. * An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`. * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
@ -117,12 +113,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
private var ordering = implicitly[Ordering[K]] private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions // An array of upper bounds for the first (partitions - 1) partitions
@volatile private var valRB: Array[K] = null private var rangeBounds: Array[K] = {
if (partitions <= 1) {
private def rangeBounds: Array[K] = this.synchronized {
if (valRB != null) return valRB
valRB = if (partitions <= 1) {
Array.empty Array.empty
} else { } else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M. // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
@ -160,8 +152,6 @@ class RangePartitioner[K : Ordering : ClassTag, V](
RangePartitioner.determineBounds(candidates, partitions) RangePartitioner.determineBounds(candidates, partitions)
} }
} }
valRB
} }
def numPartitions = rangeBounds.length + 1 def numPartitions = rangeBounds.length + 1
@ -232,8 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
} }
@throws(classOf[IOException]) @throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = this.synchronized { private def readObject(in: ObjectInputStream) {
if (valRB != null) return
val sfactory = SparkEnv.get.serializer val sfactory = SparkEnv.get.serializer
sfactory match { sfactory match {
case js: JavaSerializer => in.defaultReadObject() case js: JavaSerializer => in.defaultReadObject()
@ -245,7 +234,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
val ser = sfactory.newInstance() val ser = sfactory.newInstance()
Utils.deserializeViaNestedStream(in, ser) { ds => Utils.deserializeViaNestedStream(in, ser) { ds =>
implicit val classTag = ds.readObject[ClassTag[Array[K]]]() implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
valRB = ds.readObject[Array[K]]() rangeBounds = ds.readObject[Array[K]]()
} }
} }
} }
@ -265,18 +254,12 @@ private[spark] object RangePartitioner {
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
// use collectAsync here to run this job as a future, which is cancellable val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val sketchFuture = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16)) val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount( val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed) iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample)) Iterator((idx, n, sample))
}.collectAsync() }.collect()
// We do need the future's value to continue any further
val sketched = Await.ready(sketchFuture, Duration.Inf).value.get match {
case scala.util.Success(v) => v.toArray
case scala.util.Failure(e) => throw e
}
val numItems = sketched.map(_._2.toLong).sum val numItems = sketched.map(_._2.toLong).sum
(numItems, sketched) (numItems, sketched)
} }

View file

@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.spark.util.Utils
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
import org.apache.spark.annotation.Experimental import org.apache.spark.annotation.Experimental
@ -39,30 +38,29 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Returns a future for counting the number of elements in the RDD. * Returns a future for counting the number of elements in the RDD.
*/ */
def countAsync(): FutureAction[Long] = { def countAsync(): FutureAction[Long] = {
val f = new ComplexFutureAction[Long]
f.run {
val totalCount = new AtomicLong val totalCount = new AtomicLong
f.runJob(self, self.context.submitJob(
(iter: Iterator[T]) => Utils.getIteratorSize(iter), self,
(iter: Iterator[T]) => {
var result = 0L
while (iter.hasNext) {
result += 1L
iter.next()
}
result
},
Range(0, self.partitions.size), Range(0, self.partitions.size),
(index: Int, data: Long) => totalCount.addAndGet(data), (index: Int, data: Long) => totalCount.addAndGet(data),
totalCount.get()) totalCount.get())
} }
}
/** /**
* Returns a future for retrieving all elements of this RDD. * Returns a future for retrieving all elements of this RDD.
*/ */
def collectAsync(): FutureAction[Seq[T]] = { def collectAsync(): FutureAction[Seq[T]] = {
val f = new ComplexFutureAction[Seq[T]]
f.run {
val results = new Array[Array[T]](self.partitions.size) val results = new Array[Array[T]](self.partitions.size)
f.runJob(self, self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
(iter: Iterator[T]) => iter.toArray, (index, data) => results(index) = data, results.flatten.toSeq)
Range(0, self.partitions.size),
(index: Int, data: Array[T]) => results(index) = data,
results.flatten.toSeq)
}
} }
/** /**
@ -106,34 +104,24 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
} }
results.toSeq results.toSeq
} }
f
} }
/** /**
* Applies a function f to all elements of this RDD. * Applies a function f to all elements of this RDD.
*/ */
def foreachAsync(expr: T => Unit): FutureAction[Unit] = { def foreachAsync(f: T => Unit): FutureAction[Unit] = {
val f = new ComplexFutureAction[Unit] val cleanF = self.context.clean(f)
val exprClean = self.context.clean(expr) self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
f.run { (index, data) => Unit, Unit)
f.runJob(self,
(iter: Iterator[T]) => iter.foreach(exprClean),
Range(0, self.partitions.size),
(index: Int, data: Unit) => Unit,
Unit)
}
} }
/** /**
* Applies a function f to each partition of this RDD. * Applies a function f to each partition of this RDD.
*/ */
def foreachPartitionAsync(expr: Iterator[T] => Unit): FutureAction[Unit] = { def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
val f = new ComplexFutureAction[Unit] self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
f.run { (index, data) => Unit, Unit)
f.runJob(self,
expr,
Range(0, self.partitions.size),
(index: Int, data: Unit) => Unit,
Unit)
}
} }
} }