Move maps to util, and refactor more

This commit is contained in:
Andrew Or 2013-12-26 14:26:22 -08:00
parent 804beb43be
commit 64b2d54a02
7 changed files with 22 additions and 23 deletions

View file

@ -17,7 +17,7 @@
package org.apache.spark package org.apache.spark
import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap} import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
/** /**
* A set of functions used to aggregate data. * A set of functions used to aggregate data.

View file

@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap} import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
private[spark] sealed trait CoGroupSplitDep extends Serializable private[spark] sealed trait CoGroupSplitDep extends Serializable

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.util package org.apache.spark.util.collection
/** /**
* A simple open hash table optimized for the append-only use case, where keys * A simple open hash table optimized for the append-only use case, where keys

View file

@ -15,14 +15,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.util package org.apache.spark.util.collection
import java.io._ import java.io._
import scala.collection.mutable.{ArrayBuffer, PriorityQueue} import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap
/** /**
* A wrapper for SpillableAppendOnlyMap that handles two cases: * A wrapper for SpillableAppendOnlyMap that handles two cases:
* *
@ -35,16 +32,15 @@ import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap
class ExternalAppendOnlyMap[K, V, C]( class ExternalAppendOnlyMap[K, V, C](
createCombiner: V => C, createCombiner: V => C,
mergeValue: (C, V) => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, mergeCombiners: (C, C) => C)
memoryThresholdMB: Long = 1024)
extends Iterable[(K, C)] with Serializable { extends Iterable[(K, C)] with Serializable {
private val mergeBeforeSpill: Boolean = mergeCombiners != null private val mergeBeforeSpill: Boolean = mergeCombiners != null
private val map: SpillableAppendOnlyMap[K, V, _, C] = { private val map: SpillableAppendOnlyMap[K, V, _, C] = {
if (mergeBeforeSpill) { if (mergeBeforeSpill) {
new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue,
mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB) mergeCombiners, Predef.identity)
} else { } else {
// Use ArrayBuffer[V] as the intermediate combiner // Use ArrayBuffer[V] as the intermediate combiner
val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value) val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
@ -64,8 +60,8 @@ class ExternalAppendOnlyMap[K, V, C](
} }
combiner.getOrElse(null.asInstanceOf[C]) combiner.getOrElse(null.asInstanceOf[C])
} }
new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, mergeValueIntoGroup,
mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB) mergeGroups, combineGroup)
} }
} }
@ -82,26 +78,29 @@ class SpillableAppendOnlyMap[K, V, M, C](
createGroup: V => M, createGroup: V => M,
mergeValue: (M, V) => M, mergeValue: (M, V) => M,
mergeGroups: (M, M) => M, mergeGroups: (M, M) => M,
createCombiner: M => C, createCombiner: M => C)
memoryThresholdMB: Long = 1024)
extends Iterable[(K, C)] with Serializable { extends Iterable[(K, C)] with Serializable {
var currentMap = new SizeTrackingAppendOnlyMap[K, M] var currentMap = new SizeTrackingAppendOnlyMap[K, M]
var oldMaps = new ArrayBuffer[DiskIterator] val oldMaps = new ArrayBuffer[DiskIterator]
val memoryThreshold = {
val bufferSize = System.getProperty("spark.shuffle.buffer", "1024").toLong * 1024 * 1024
val bufferPercent = System.getProperty("spark.shuffle.buffer.percent", "0.8").toFloat
bufferSize * bufferPercent
}
def insert(key: K, value: V): Unit = { def insert(key: K, value: V): Unit = {
val update: (Boolean, M) => M = (hadVal, oldVal) => { val update: (Boolean, M) => M = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createGroup(value) if (hadVal) mergeValue(oldVal, value) else createGroup(value)
} }
currentMap.changeValue(key, update) currentMap.changeValue(key, update)
// TODO: Make sure we're only using some % of the actual threshold due to error if (currentMap.estimateSize() > memoryThreshold) {
if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) {
spill() spill()
} }
} }
def spill(): Unit = { def spill(): Unit = {
val file = File.createTempFile("external_append_only_map", "") // Add spill location val file = File.createTempFile("external_append_only_map", "")
val out = new ObjectOutputStream(new FileOutputStream(file)) val out = new ObjectOutputStream(new FileOutputStream(file))
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
sortedMap.foreach(out.writeObject) sortedMap.foreach(out.writeObject)

View file

@ -1,6 +1,6 @@
package org.apache.spark.util.collection package org.apache.spark.util.collection
import org.apache.spark.util.{AppendOnlyMap, SamplingSizeTracker} import org.apache.spark.util.SamplingSizeTracker
/** Append-only map that keeps track of its estimated size in bytes. */ /** Append-only map that keeps track of its estimated size in bytes. */
class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] { class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {

View file

@ -5,7 +5,7 @@ import scala.util.Random
import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.util.SamplingSizeTrackerSuite.LargeDummyClass import org.apache.spark.util.SamplingSizeTrackerSuite.LargeDummyClass
import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll { class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll {
val NORMAL_ERROR = 0.20 val NORMAL_ERROR = 0.20

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.util package org.apache.spark.util.collection
import scala.collection.mutable.HashSet import scala.collection.mutable.HashSet