[SPARK-8309] [CORE] Support for more than 12M items in OpenHashMap
The problem occurs because the position mask `0xEFFFFFF` is incorrect. It has zero 25th bit, so when capacity grows beyond 2^24, `OpenHashMap` calculates incorrect index of value in `_values` array. I've also added a size check in `rehash()`, so that it fails instead of reporting invalid item indices. Author: Vyacheslav Baranov <slavik.baranov@gmail.com> Closes #6763 from SlavikBaranov/SPARK-8309 and squashes the following commits: 8557445 [Vyacheslav Baranov] Resolved review comments 4d5b954 [Vyacheslav Baranov] Resolved review comments eaf1e68 [Vyacheslav Baranov] Fixed failing test f9284fd [Vyacheslav Baranov] Resolved review comments 3920656 [Vyacheslav Baranov] SPARK-8309: Support for more than 12M items in OpenHashMap
This commit is contained in:
parent
e3de14d3b2
commit
c13da20a55
|
@ -45,7 +45,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
|
|||
loadFactor: Double)
|
||||
extends Serializable {
|
||||
|
||||
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
|
||||
require(initialCapacity <= OpenHashSet.MAX_CAPACITY,
|
||||
s"Can't make capacity bigger than ${OpenHashSet.MAX_CAPACITY} elements")
|
||||
require(initialCapacity >= 1, "Invalid initial capacity")
|
||||
require(loadFactor < 1.0, "Load factor must be less than 1.0")
|
||||
require(loadFactor > 0.0, "Load factor must be greater than 0.0")
|
||||
|
@ -223,6 +224,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
|
|||
*/
|
||||
private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
|
||||
val newCapacity = _capacity * 2
|
||||
require(newCapacity > 0 && newCapacity <= OpenHashSet.MAX_CAPACITY,
|
||||
s"Can't contain more than ${(loadFactor * OpenHashSet.MAX_CAPACITY).toInt} elements")
|
||||
allocateFunc(newCapacity)
|
||||
val newBitset = new BitSet(newCapacity)
|
||||
val newData = new Array[T](newCapacity)
|
||||
|
@ -276,9 +279,10 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
|
|||
private[spark]
|
||||
object OpenHashSet {
|
||||
|
||||
val MAX_CAPACITY = 1 << 30
|
||||
val INVALID_POS = -1
|
||||
val NONEXISTENCE_MASK = 0x80000000
|
||||
val POSITION_MASK = 0xEFFFFFF
|
||||
val NONEXISTENCE_MASK = 1 << 31
|
||||
val POSITION_MASK = (1 << 31) - 1
|
||||
|
||||
/**
|
||||
* A set of specialized hash function implementation to avoid boxing hash code computation
|
||||
|
|
|
@ -44,7 +44,7 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
|
|||
val goodMap3 = new OpenHashMap[String, String](256)
|
||||
assert(goodMap3.size === 0)
|
||||
intercept[IllegalArgumentException] {
|
||||
new OpenHashMap[String, Int](1 << 30) // Invalid map size: bigger than 2^29
|
||||
new OpenHashMap[String, Int](1 << 30 + 1) // Invalid map size: bigger than 2^30
|
||||
}
|
||||
intercept[IllegalArgumentException] {
|
||||
new OpenHashMap[String, Int](-1)
|
||||
|
@ -186,4 +186,14 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
|
|||
map(null) = 0
|
||||
assert(map.contains(null))
|
||||
}
|
||||
|
||||
test("support for more than 12M items") {
|
||||
val cnt = 12000000 // 12M
|
||||
val map = new OpenHashMap[Int, Int](cnt)
|
||||
for (i <- 0 until cnt) {
|
||||
map(i) = 1
|
||||
}
|
||||
val numInvalidValues = map.iterator.count(_._2 == 0)
|
||||
assertResult(0)(numInvalidValues)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ class PrimitiveKeyOpenHashMapSuite extends SparkFunSuite with Matchers {
|
|||
val goodMap3 = new PrimitiveKeyOpenHashMap[Int, Int](256)
|
||||
assert(goodMap3.size === 0)
|
||||
intercept[IllegalArgumentException] {
|
||||
new PrimitiveKeyOpenHashMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
|
||||
new PrimitiveKeyOpenHashMap[Int, Int](1 << 30 + 1) // Invalid map size: bigger than 2^30
|
||||
}
|
||||
intercept[IllegalArgumentException] {
|
||||
new PrimitiveKeyOpenHashMap[Int, Int](-1)
|
||||
|
|
Loading…
Reference in a new issue