Minor cleanup for Scala style
This commit is contained in:
parent
2a2ca2a661
commit
fcc443b3db
|
@ -67,10 +67,10 @@ case class Aggregator[K, V, C] (
|
|||
combiners.iterator
|
||||
} else {
|
||||
// Spilling
|
||||
def combinerIdentity(combiner:C) = combiner
|
||||
val combiners = new ExternalAppendOnlyMap[K, C, C](combinerIdentity, mergeCombiners, mergeCombiners)
|
||||
val combiners =
|
||||
new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners)
|
||||
iter.foreach { case(k, c) => combiners.insert(k, c) }
|
||||
combiners.iterator
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv
|
|||
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
|
||||
import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap}
|
||||
|
||||
|
||||
private[spark] sealed trait CoGroupSplitDep extends Serializable
|
||||
|
||||
private[spark] case class NarrowCoGroupSplitDep(
|
||||
|
@ -62,6 +61,10 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
|
|||
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
|
||||
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
|
||||
|
||||
type CoGroup = ArrayBuffer[Any]
|
||||
type CoGroupValue = (Any, Int) // Int is dependency number
|
||||
type CoGroupCombiner = Seq[CoGroup]
|
||||
|
||||
private var serializerClass: String = null
|
||||
|
||||
def setSerializer(cls: String): CoGroupedRDD[K] = {
|
||||
|
@ -125,7 +128,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
|
|||
|
||||
if (!externalSorting) {
|
||||
val map = new AppendOnlyMap[K, CoGroupCombiner]
|
||||
val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
|
||||
val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
|
||||
if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
|
||||
}
|
||||
val getSeq = (k: K) => map.changeValue(k, update)
|
||||
|
@ -147,30 +150,29 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
|
|||
}
|
||||
}
|
||||
|
||||
private def createExternalMap(numRdds:Int): ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] = {
|
||||
def createCombiner(v: CoGroupValue): CoGroupCombiner = {
|
||||
private def createExternalMap(numRdds: Int)
|
||||
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
|
||||
|
||||
val createCombiner: (CoGroupValue) => CoGroupCombiner = v => {
|
||||
val newCombiner = Array.fill(numRdds)(new CoGroup)
|
||||
mergeValue(newCombiner, v)
|
||||
v match { case (value, depNum) => newCombiner(depNum) += value }
|
||||
newCombiner
|
||||
}
|
||||
def mergeValue(c: CoGroupCombiner, v: CoGroupValue): CoGroupCombiner = {
|
||||
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (c, v) => {
|
||||
v match { case (value, depNum) => c(depNum) += value }
|
||||
c
|
||||
}
|
||||
def mergeCombiners(c1: CoGroupCombiner, c2: CoGroupCombiner): CoGroupCombiner = {
|
||||
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (c1, c2) => {
|
||||
c1.zipAll(c2, new CoGroup, new CoGroup).map {
|
||||
case (v1, v2) => v1 ++ v2
|
||||
}
|
||||
}
|
||||
new ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] (
|
||||
createCombiner,mergeValue, mergeCombiners)
|
||||
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
|
||||
createCombiner, mergeValue, mergeCombiners)
|
||||
}
|
||||
|
||||
override def clearDependencies() {
|
||||
super.clearDependencies()
|
||||
rdds = null
|
||||
}
|
||||
|
||||
type CoGroup = ArrayBuffer[Any]
|
||||
type CoGroupValue = (Any, Int) // Int is dependency number
|
||||
type CoGroupCombiner = Seq[CoGroup]
|
||||
}
|
||||
|
|
|
@ -18,21 +18,27 @@
|
|||
package org.apache.spark.util
|
||||
|
||||
import java.io._
|
||||
import java.text.DecimalFormat
|
||||
|
||||
import scala.Some
|
||||
import scala.Predef._
|
||||
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
|
||||
import scala.util.Random
|
||||
|
||||
/**
|
||||
* A wrapper for SpillableAppendOnlyMap that handles two cases:
|
||||
*
|
||||
* (1) If a mergeCombiners function is specified, merge values into combiners before
|
||||
* disk spill, as it is possible to merge the resulting combiners later
|
||||
* disk spill, as it is possible to merge the resulting combiners later.
|
||||
*
|
||||
* (2) Otherwise, group values of the same key together before disk spill, and merge
|
||||
* them into combiners only after reading them back from disk
|
||||
* them into combiners only after reading them back from disk.
|
||||
*/
|
||||
class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
|
||||
mergeValue: (C, V) => C,
|
||||
mergeCombiners: (C, C) => C,
|
||||
memoryThresholdMB: Int = 1024)
|
||||
class ExternalAppendOnlyMap[K, V, C](
|
||||
createCombiner: V => C,
|
||||
mergeValue: (C, V) => C,
|
||||
mergeCombiners: (C, C) => C,
|
||||
memoryThresholdMB: Long = 1024)
|
||||
extends Iterable[(K, C)] with Serializable {
|
||||
|
||||
private val mergeBeforeSpill: Boolean = mergeCombiners != null
|
||||
|
@ -40,8 +46,9 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
|
|||
private val map: SpillableAppendOnlyMap[K, V, _, C] = {
|
||||
if (mergeBeforeSpill) {
|
||||
new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
|
||||
mergeValue, mergeCombiners, combinerIdentity, memoryThresholdMB)
|
||||
mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB)
|
||||
} else {
|
||||
val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
|
||||
new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
|
||||
mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
|
||||
}
|
||||
|
@ -51,8 +58,6 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
|
|||
|
||||
override def iterator: Iterator[(K, C)] = map.iterator
|
||||
|
||||
private def combinerIdentity(combiner: C): C = combiner
|
||||
private def createGroup(value: V): ArrayBuffer[V] = ArrayBuffer[V](value)
|
||||
private def mergeValueIntoGroup(group: ArrayBuffer[V], value: V): ArrayBuffer[V] = {
|
||||
group += value
|
||||
group
|
||||
|
@ -78,14 +83,16 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
|
|||
* is exceeded. A group with type M is an intermediate combiner, and shares the same
|
||||
* type as either C or ArrayBuffer[V].
|
||||
*/
|
||||
class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
|
||||
mergeValue: (M, V) => M,
|
||||
mergeGroups: (M, M) => M,
|
||||
createCombiner: M => C,
|
||||
memoryThresholdMB: Int = 1024)
|
||||
class SpillableAppendOnlyMap[K, V, M, C](
|
||||
createGroup: V => M,
|
||||
mergeValue: (M, V) => M,
|
||||
mergeGroups: (M, M) => M,
|
||||
createCombiner: M => C,
|
||||
memoryThresholdMB: Long = 1024)
|
||||
extends Iterable[(K, C)] with Serializable {
|
||||
|
||||
var currentMap = new AppendOnlyMap[K, M]
|
||||
var sizeTracker = new SamplingSizeTracker(currentMap)
|
||||
var oldMaps = new ArrayBuffer[DiskIterator]
|
||||
|
||||
def insert(key: K, value: V): Unit = {
|
||||
|
@ -93,9 +100,8 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
|
|||
if (hadVal) mergeValue(oldVal, value) else createGroup(value)
|
||||
}
|
||||
currentMap.changeValue(key, update)
|
||||
val mapSize = SizeEstimator.estimate(currentMap)
|
||||
//if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
|
||||
if (mapSize > 1024 * 10) {
|
||||
sizeTracker.updateMade()
|
||||
if (sizeTracker.estimateSize() > memoryThresholdMB * 1024 * 1024) {
|
||||
spill()
|
||||
}
|
||||
}
|
||||
|
@ -104,9 +110,10 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
|
|||
val file = File.createTempFile("external_append_only_map", "") // Add spill location
|
||||
val out = new ObjectOutputStream(new FileOutputStream(file))
|
||||
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
|
||||
sortedMap.foreach { out.writeObject( _ ) }
|
||||
sortedMap.foreach(out.writeObject)
|
||||
out.close()
|
||||
currentMap = new AppendOnlyMap[K, M]
|
||||
sizeTracker = new SamplingSizeTracker(currentMap)
|
||||
oldMaps.append(new DiskIterator(file))
|
||||
}
|
||||
|
||||
|
@ -115,13 +122,10 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
|
|||
// An iterator that sort-merges (K, M) pairs from memory and disk into (K, C) pairs
|
||||
class ExternalIterator extends Iterator[(K, C)] {
|
||||
|
||||
// Order by increasing key hash value
|
||||
implicit object KVOrdering extends Ordering[KMITuple] {
|
||||
def compare(a:KMITuple, b:KMITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
|
||||
}
|
||||
val pq = PriorityQueue[KMITuple]()
|
||||
// Order by key hash value
|
||||
val pq = PriorityQueue[KMITuple]()(Ordering.by(_.key.hashCode()))
|
||||
val inputStreams = Seq(new MemoryIterator(currentMap)) ++ oldMaps
|
||||
inputStreams.foreach { readFromIterator( _ ) }
|
||||
inputStreams.foreach(readFromIterator)
|
||||
|
||||
// Read from the given iterator until a key of different hash is retrieved
|
||||
def readFromIterator(iter: Iterator[(K, M)]): Unit = {
|
||||
|
@ -131,10 +135,7 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
|
|||
pq.enqueue(KMITuple(k, m, iter))
|
||||
minHash match {
|
||||
case None => minHash = Some(k.hashCode())
|
||||
case Some(expectedHash) =>
|
||||
if (k.hashCode() != expectedHash){
|
||||
return
|
||||
}
|
||||
case Some(expectedHash) if k.hashCode() != expectedHash => return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -159,16 +160,16 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
|
|||
collidedKMI += newKMI
|
||||
}
|
||||
}
|
||||
collidedKMI.foreach { pq.enqueue( _ ) }
|
||||
collidedKMI.foreach(pq.enqueue(_))
|
||||
(minKey, createCombiner(minGroup))
|
||||
}
|
||||
|
||||
case class KMITuple(key:K, group:M, iterator:Iterator[(K, M)])
|
||||
case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)])
|
||||
}
|
||||
|
||||
// Iterate through (K, M) pairs in sorted order from the in-memory map
|
||||
class MemoryIterator(map: AppendOnlyMap[K, M]) extends Iterator[(K, M)] {
|
||||
val sortedMap = currentMap.iterator.toList.sortBy(kc => kc._1.hashCode())
|
||||
val sortedMap = currentMap.iterator.toList.sortBy(km => km._1.hashCode())
|
||||
val it = sortedMap.iterator
|
||||
override def hasNext: Boolean = it.hasNext
|
||||
override def next(): (K, M) = it.next()
|
||||
|
@ -180,21 +181,18 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
|
|||
var nextItem: Option[(K, M)] = None
|
||||
|
||||
override def hasNext: Boolean = {
|
||||
try {
|
||||
nextItem = Some(in.readObject().asInstanceOf[(K, M)])
|
||||
true
|
||||
nextItem = try {
|
||||
Some(in.readObject().asInstanceOf[(K, M)])
|
||||
} catch {
|
||||
case e: EOFException =>
|
||||
nextItem = None
|
||||
false
|
||||
case e: EOFException => None
|
||||
}
|
||||
nextItem.isDefined
|
||||
}
|
||||
|
||||
override def next(): (K, M) = {
|
||||
nextItem match {
|
||||
case Some(item) => item
|
||||
case None =>
|
||||
throw new NoSuchElementException
|
||||
case None => throw new NoSuchElementException
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue