New minor edits
This commit is contained in:
parent
fcc443b3db
commit
7ad4408255
|
@ -45,8 +45,8 @@ case class Aggregator[K, V, C] (
|
||||||
}
|
}
|
||||||
combiners.iterator
|
combiners.iterator
|
||||||
} else {
|
} else {
|
||||||
// Spilling
|
val combiners =
|
||||||
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
|
new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
|
||||||
iter.foreach { case(k, v) => combiners.insert(k, v) }
|
iter.foreach { case(k, v) => combiners.insert(k, v) }
|
||||||
combiners.iterator
|
combiners.iterator
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,6 @@ case class Aggregator[K, V, C] (
|
||||||
}
|
}
|
||||||
combiners.iterator
|
combiners.iterator
|
||||||
} else {
|
} else {
|
||||||
// Spilling
|
|
||||||
val combiners =
|
val combiners =
|
||||||
new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners)
|
new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners)
|
||||||
iter.foreach { case(k, c) => combiners.insert(k, c) }
|
iter.foreach { case(k, c) => combiners.insert(k, c) }
|
||||||
|
|
|
@ -50,7 +50,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
|
||||||
override def hashCode(): Int = idx
|
override def hashCode(): Int = idx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
|
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
|
||||||
* tuple with the list of values for that key.
|
* tuple with the list of values for that key.
|
||||||
|
@ -108,7 +107,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
|
||||||
val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
|
val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
|
||||||
val split = s.asInstanceOf[CoGroupPartition]
|
val split = s.asInstanceOf[CoGroupPartition]
|
||||||
val numRdds = split.deps.size
|
val numRdds = split.deps.size
|
||||||
val ser = SparkEnv.get.serializerManager.get(serializerClass)
|
|
||||||
|
|
||||||
// A list of (rdd iterator, dependency number) pairs
|
// A list of (rdd iterator, dependency number) pairs
|
||||||
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
|
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
|
||||||
|
@ -121,6 +119,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
|
||||||
case ShuffleCoGroupSplitDep(shuffleId) => {
|
case ShuffleCoGroupSplitDep(shuffleId) => {
|
||||||
// Read map outputs of shuffle
|
// Read map outputs of shuffle
|
||||||
val fetcher = SparkEnv.get.shuffleFetcher
|
val fetcher = SparkEnv.get.shuffleFetcher
|
||||||
|
val ser = SparkEnv.get.serializerManager.get(serializerClass)
|
||||||
val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum)
|
val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum)
|
||||||
rddIterators += v
|
rddIterators += v
|
||||||
}
|
}
|
||||||
|
@ -131,39 +130,39 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
|
||||||
val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
|
val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
|
||||||
if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
|
if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
|
||||||
}
|
}
|
||||||
val getSeq = (k: K) => map.changeValue(k, update)
|
rddIterators.foreach { case(it, depNum) =>
|
||||||
rddIterators.foreach { case(iter, depNum) =>
|
it.foreach { case(k, v) =>
|
||||||
iter.foreach {
|
map.changeValue(k, update)(depNum) += v
|
||||||
case(k, v) => getSeq(k)(depNum) += v
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
new InterruptibleIterator(context, map.iterator)
|
new InterruptibleIterator(context, map.iterator)
|
||||||
} else {
|
} else {
|
||||||
// Spilling
|
|
||||||
val map = createExternalMap(numRdds)
|
val map = createExternalMap(numRdds)
|
||||||
rddIterators.foreach { case(iter, depNum) =>
|
rddIterators.foreach { case(it, depNum) =>
|
||||||
iter.foreach {
|
it.foreach { case(k, v) =>
|
||||||
case(k, v) => map.insert(k, new CoGroupValue(v, depNum))
|
map.insert(k, new CoGroupValue(v, depNum))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
new InterruptibleIterator(context, map.iterator)
|
new InterruptibleIterator(context, map.iterator)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def createExternalMap(numRdds: Int)
|
private def createExternalMap(numRdds: Int):
|
||||||
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
|
ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
|
||||||
|
|
||||||
val createCombiner: (CoGroupValue) => CoGroupCombiner = v => {
|
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
|
||||||
val newCombiner = Array.fill(numRdds)(new CoGroup)
|
val newCombiner = Array.fill(numRdds)(new CoGroup)
|
||||||
v match { case (value, depNum) => newCombiner(depNum) += value }
|
value match { case(v, depNum) => newCombiner(depNum) += v }
|
||||||
newCombiner
|
newCombiner
|
||||||
}
|
}
|
||||||
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (c, v) => {
|
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
|
||||||
v match { case (value, depNum) => c(depNum) += value }
|
(combiner, value) => {
|
||||||
c
|
value match { case(v, depNum) => combiner(depNum) += v }
|
||||||
|
combiner
|
||||||
}
|
}
|
||||||
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (c1, c2) => {
|
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
|
||||||
c1.zipAll(c2, new CoGroup, new CoGroup).map {
|
(combiner1, combiner2) => {
|
||||||
|
combiner1.zipAll(combiner2, new CoGroup, new CoGroup).map {
|
||||||
case (v1, v2) => v1 ++ v2
|
case (v1, v2) => v1 ++ v2
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,11 +28,11 @@ import scala.util.Random
|
||||||
/**
|
/**
|
||||||
* A wrapper for SpillableAppendOnlyMap that handles two cases:
|
* A wrapper for SpillableAppendOnlyMap that handles two cases:
|
||||||
*
|
*
|
||||||
* (1) If a mergeCombiners function is specified, merge values into combiners before
|
* (1) If a mergeCombiners function is specified, merge values into combiners before disk
|
||||||
* disk spill, as it is possible to merge the resulting combiners later.
|
* spill, as it is possible to merge the resulting combiners later.
|
||||||
*
|
*
|
||||||
* (2) Otherwise, group values of the same key together before disk spill, and merge
|
* (2) Otherwise, group values of the same key together before disk spill, and merge them
|
||||||
* them into combiners only after reading them back from disk.
|
* into combiners only after reading them back from disk.
|
||||||
*/
|
*/
|
||||||
class ExternalAppendOnlyMap[K, V, C](
|
class ExternalAppendOnlyMap[K, V, C](
|
||||||
createCombiner: V => C,
|
createCombiner: V => C,
|
||||||
|
@ -48,8 +48,25 @@ class ExternalAppendOnlyMap[K, V, C](
|
||||||
new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
|
new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
|
||||||
mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB)
|
mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB)
|
||||||
} else {
|
} else {
|
||||||
|
// Use ArrayBuffer[V] as the intermediate combiner
|
||||||
val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
|
val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
|
||||||
new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
|
val mergeValueIntoGroup: (ArrayBuffer[V], V) => ArrayBuffer[V] = (group, value) => {
|
||||||
|
group += value
|
||||||
|
}
|
||||||
|
val mergeGroups: (ArrayBuffer[V], ArrayBuffer[V]) => ArrayBuffer[V] = (group1, group2) => {
|
||||||
|
group1 ++= group2
|
||||||
|
}
|
||||||
|
val combineGroup: (ArrayBuffer[V] => C) = group => {
|
||||||
|
var combiner : Option[C] = None
|
||||||
|
group.foreach { v =>
|
||||||
|
combiner match {
|
||||||
|
case None => combiner = Some(createCombiner(v))
|
||||||
|
case Some(c) => combiner = Some(mergeValue(c, v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
combiner.getOrElse(null.asInstanceOf[C])
|
||||||
|
}
|
||||||
|
new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup,
|
||||||
mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
|
mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,31 +74,11 @@ class ExternalAppendOnlyMap[K, V, C](
|
||||||
def insert(key: K, value: V): Unit = map.insert(key, value)
|
def insert(key: K, value: V): Unit = map.insert(key, value)
|
||||||
|
|
||||||
override def iterator: Iterator[(K, C)] = map.iterator
|
override def iterator: Iterator[(K, C)] = map.iterator
|
||||||
|
|
||||||
private def mergeValueIntoGroup(group: ArrayBuffer[V], value: V): ArrayBuffer[V] = {
|
|
||||||
group += value
|
|
||||||
group
|
|
||||||
}
|
|
||||||
private def mergeGroups(group1: ArrayBuffer[V], group2: ArrayBuffer[V]): ArrayBuffer[V] = {
|
|
||||||
group1 ++= group2
|
|
||||||
group1
|
|
||||||
}
|
|
||||||
private def combineGroup(group: ArrayBuffer[V]): C = {
|
|
||||||
var combiner : Option[C] = None
|
|
||||||
group.foreach { v =>
|
|
||||||
combiner match {
|
|
||||||
case None => combiner = Some(createCombiner(v))
|
|
||||||
case Some(c) => combiner = Some(mergeValue(c, v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
combiner.get
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An append-only map that spills sorted content to disk when the memory threshold
|
* An append-only map that spills sorted content to disk when the memory threshold is exceeded.
|
||||||
* is exceeded. A group with type M is an intermediate combiner, and shares the same
|
* A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V].
|
||||||
* type as either C or ArrayBuffer[V].
|
|
||||||
*/
|
*/
|
||||||
class SpillableAppendOnlyMap[K, V, M, C](
|
class SpillableAppendOnlyMap[K, V, M, C](
|
||||||
createGroup: V => M,
|
createGroup: V => M,
|
||||||
|
@ -96,7 +93,7 @@ class SpillableAppendOnlyMap[K, V, M, C](
|
||||||
var oldMaps = new ArrayBuffer[DiskIterator]
|
var oldMaps = new ArrayBuffer[DiskIterator]
|
||||||
|
|
||||||
def insert(key: K, value: V): Unit = {
|
def insert(key: K, value: V): Unit = {
|
||||||
def update(hadVal: Boolean, oldVal: M): M = {
|
val update: (Boolean, M) => M = (hadVal, oldVal) => {
|
||||||
if (hadVal) mergeValue(oldVal, value) else createGroup(value)
|
if (hadVal) mergeValue(oldVal, value) else createGroup(value)
|
||||||
}
|
}
|
||||||
currentMap.changeValue(key, update)
|
currentMap.changeValue(key, update)
|
||||||
|
@ -128,11 +125,11 @@ class SpillableAppendOnlyMap[K, V, M, C](
|
||||||
inputStreams.foreach(readFromIterator)
|
inputStreams.foreach(readFromIterator)
|
||||||
|
|
||||||
// Read from the given iterator until a key of different hash is retrieved
|
// Read from the given iterator until a key of different hash is retrieved
|
||||||
def readFromIterator(iter: Iterator[(K, M)]): Unit = {
|
def readFromIterator(it: Iterator[(K, M)]): Unit = {
|
||||||
var minHash : Option[Int] = None
|
var minHash : Option[Int] = None
|
||||||
while (iter.hasNext) {
|
while (it.hasNext) {
|
||||||
val (k, m) = iter.next()
|
val (k, m) = it.next()
|
||||||
pq.enqueue(KMITuple(k, m, iter))
|
pq.enqueue(KMITuple(k, m, it))
|
||||||
minHash match {
|
minHash match {
|
||||||
case None => minHash = Some(k.hashCode())
|
case None => minHash = Some(k.hashCode())
|
||||||
case Some(expectedHash) if k.hashCode() != expectedHash => return
|
case Some(expectedHash) if k.hashCode() != expectedHash => return
|
||||||
|
|
Loading…
Reference in a new issue