Add support and test for null keys in ExternalAppendOnlyMap

Also add safeguard against use of destructively sorted AppendOnlyMap
This commit is contained in:
Andrew Or 2013-12-31 17:19:02 -08:00
parent 3ce22df954
commit 53d8d36684
4 changed files with 142 additions and 35 deletions

View file

@ -48,10 +48,14 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
private var haveNullValue = false
private var nullValue: V = null.asInstanceOf[V]
// Triggered by destructiveSortedIterator; the underlying data array may no longer be used
private var destroyed = false
private val LOAD_FACTOR = 0.7
/** Get the value for a given key */
def apply(key: K): V = {
checkValidityOrThrowException()
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
return nullValue
@ -75,6 +79,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
/** Set the value for a key */
def update(key: K, value: V): Unit = {
checkValidityOrThrowException()
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@ -109,6 +114,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
* for key, if any, or null otherwise. Returns the newly updated value.
*/
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
checkValidityOrThrowException()
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@ -142,35 +148,38 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
/** Iterator method from Iterable */
override def iterator: Iterator[(K, V)] = new Iterator[(K, V)] {
var pos = -1
override def iterator: Iterator[(K, V)] = {
checkValidityOrThrowException()
new Iterator[(K, V)] {
var pos = -1
/** Get the next value we should return from next(), or null if we're finished iterating */
def nextValue(): (K, V) = {
if (pos == -1) { // Treat position -1 as looking at the null value
if (haveNullValue) {
return (null.asInstanceOf[K], nullValue)
/** Get the next value we should return from next(), or null if we're finished iterating */
def nextValue(): (K, V) = {
if (pos == -1) { // Treat position -1 as looking at the null value
if (haveNullValue) {
return (null.asInstanceOf[K], nullValue)
}
pos += 1
}
while (pos < capacity) {
if (!data(2 * pos).eq(null)) {
return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
}
pos += 1
}
null
}
override def hasNext: Boolean = nextValue() != null
override def next(): (K, V) = {
val value = nextValue()
if (value == null) {
throw new NoSuchElementException("End of iterator")
}
pos += 1
value
}
while (pos < capacity) {
if (!data(2 * pos).eq(null)) {
return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
}
pos += 1
}
null
}
override def hasNext: Boolean = nextValue() != null
override def next(): (K, V) = {
val value = nextValue()
if (value == null) {
throw new NoSuchElementException("End of iterator")
}
pos += 1
value
}
}
@ -238,12 +247,14 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
if (highBit == n) n else highBit << 1
}
/** Return an iterator of the map in sorted order. This provides a way to sort the map without
* using additional memory, at the expense of destroying the validity of the map.
*/
/**
* Return an iterator of the map in sorted order. This provides a way to sort the map without
* using additional memory, at the expense of destroying the validity of the map.
*/
def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
var keyIndex, newIndex = 0
destroyed = true
// Pack KV pairs into the front of the underlying array
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
@ -251,23 +262,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
keyIndex += 1
}
assert(newIndex == curSize)
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
// Sort by the given ordering
val rawOrdering = new Comparator[AnyRef] {
def compare(x: AnyRef, y: AnyRef): Int = {
cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
}
}
util.Arrays.sort(data, 0, curSize, rawOrdering)
util.Arrays.sort(data, 0, newIndex, rawOrdering)
new Iterator[(K, V)] {
var i = 0
def hasNext = i < curSize
var nullValueReady = haveNullValue
def hasNext: Boolean = (i < newIndex || nullValueReady)
def next(): (K, V) = {
val item = data(i).asInstanceOf[(K, V)]
i += 1
item
if (nullValueReady) {
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
val item = data(i).asInstanceOf[(K, V)]
i += 1
item
}
}
}
}
private def checkValidityOrThrowException(): Unit = {
if (destroyed) {
throw new IllegalStateException("Map state is invalid from destructive sorting!")
}
}
}

View file

@ -112,6 +112,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
private var currentMap = new SizeTrackingAppendOnlyMap[K, G]
private val oldMaps = new ArrayBuffer[DiskKGIterator]
private val memoryThresholdMB = {
val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong
val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat

View file

@ -20,6 +20,7 @@ package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
import java.util.Comparator
class AppendOnlyMapSuite extends FunSuite {
test("initialization") {
@ -151,4 +152,47 @@ class AppendOnlyMapSuite extends FunSuite {
assert(map("" + i) === "" + i)
}
}
test("destructive sort") {
val map = new AppendOnlyMap[String, String]()
for (i <- 1 to 100) {
map("" + i) = "" + i
}
map.update(null, "happy new year!")
try {
map.apply("1")
map.update("1", "2013")
map.changeValue("1", (hadValue, oldValue) => "2014")
map.iterator
} catch {
case e: IllegalStateException => fail()
}
val it = map.destructiveSortedIterator(new Comparator[(String, String)] {
def compare(kv1: (String, String), kv2: (String, String)): Int = {
val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue
val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue
x.compareTo(y)
}
})
// Should be sorted by key
assert(it.hasNext)
var previous = it.next()
assert(previous == (null, "happy new year!"))
previous = it.next()
assert(previous == ("1", "2014"))
while (it.hasNext) {
val kv = it.next()
assert(kv._1.toInt > previous._1.toInt)
previous = kv
}
// All subsequent calls to apply, update, changeValue and iterator should throw exception
intercept[IllegalStateException] { map.apply("1") }
intercept[IllegalStateException] { map.update("1", "2013") }
intercept[IllegalStateException] { map.changeValue("1", (hadValue, oldValue) => "2014") }
intercept[IllegalStateException] { map.iterator }
}
}

View file

@ -113,6 +113,44 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
}
test("null keys and values") {
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map.insert(1, 5)
map.insert(2, 6)
map.insert(3, 7)
assert(map.size === 3)
assert(map.iterator.toSet == Set[(Int, Seq[Int])](
(1, Seq[Int](5)),
(2, Seq[Int](6)),
(3, Seq[Int](7))
))
// Null keys
val nullInt = null.asInstanceOf[Int]
map.insert(nullInt, 8)
assert(map.size === 4)
assert(map.iterator.toSet == Set[(Int, Seq[Int])](
(1, Seq[Int](5)),
(2, Seq[Int](6)),
(3, Seq[Int](7)),
(nullInt, Seq[Int](8))
))
// Null values
map.insert(4, nullInt)
map.insert(nullInt, nullInt)
assert(map.size === 5)
val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
assert(result == Set[(Int, Set[Int])](
(1, Set[Int](5)),
(2, Set[Int](6)),
(3, Set[Int](7)),
(4, Set[Int](nullInt)),
(nullInt, Set[Int](nullInt, 8))
))
}
test("simple aggregator") {
// reduceByKey
val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))