[SPARK-17529][CORE] Implement BitSet.clearUntil and use it during merge joins

## What changes were proposed in this pull request?

Add a clearUntil() method on BitSet (adapted from the pre-existing setUntil() method).
Use this method to clear the subset of the BitSet which needs to be used during merge joins.

## How was this patch tested?

dev/run-tests, as well as performance tests on skewed data as described in jira.

I expect there to be a small local performance hit using BitSet.clearUntil rather than BitSet.clear for normally shaped (unskewed) joins (additional read on the last long).  This is expected to be de-minimis and was not specifically tested.

Author: David Navas <davidn@clearstorydata.com>

Closes #15084 from davidnavas/bitSet.
This commit is contained in:
David Navas 2016-09-17 16:22:23 +01:00 committed by Sean Owen
parent 25cbbe6ca3
commit 9dbd4b864e
No known key found for this signature in database
GPG key ID: BEB3956D6717BDDC
3 changed files with 52 additions and 12 deletions

View file

@ -17,6 +17,8 @@
package org.apache.spark.util.collection
import java.util.Arrays
/**
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
@ -35,21 +37,14 @@ class BitSet(numBits: Int) extends Serializable {
/**
* Clear all set bits.
*/
def clear(): Unit = {
var i = 0
while (i < numWords) {
words(i) = 0L
i += 1
}
}
def clear(): Unit = Arrays.fill(words, 0)
/**
* Set all the bits up to a given index
*/
def setUntil(bitIndex: Int) {
def setUntil(bitIndex: Int): Unit = {
val wordIndex = bitIndex >> 6 // divide by 64
var i = 0
while(i < wordIndex) { words(i) = -1; i += 1 }
Arrays.fill(words, 0, wordIndex, -1)
if(wordIndex < words.length) {
// Set the remaining bits (note that the mask could still be zero)
val mask = ~(-1L << (bitIndex & 0x3f))
@ -57,6 +52,19 @@ class BitSet(numBits: Int) extends Serializable {
}
}
/**
* Clear all the bits up to a given index
*/
def clearUntil(bitIndex: Int): Unit = {
val wordIndex = bitIndex >> 6 // divide by 64
Arrays.fill(words, 0, wordIndex, 0)
if(wordIndex < words.length) {
// Clear the remaining bits
val mask = -1L << (bitIndex & 0x3f)
words(wordIndex) &= mask
}
}
/**
* Compute the bit-wise AND of the two sets returning the
* result.

View file

@ -152,4 +152,36 @@ class BitSetSuite extends SparkFunSuite {
assert(bitsetDiff.nextSetBit(85) === 85)
assert(bitsetDiff.nextSetBit(86) === -1)
}
test( "[gs]etUntil" ) {
val bitSet = new BitSet(100)
bitSet.setUntil(bitSet.capacity)
(0 until bitSet.capacity).foreach { i =>
assert(bitSet.get(i))
}
bitSet.clearUntil(bitSet.capacity)
(0 until bitSet.capacity).foreach { i =>
assert(!bitSet.get(i))
}
val setUntil = bitSet.capacity / 2
bitSet.setUntil(setUntil)
val clearUntil = setUntil / 2
bitSet.clearUntil(clearUntil)
(0 until clearUntil).foreach { i =>
assert(!bitSet.get(i))
}
(clearUntil until setUntil).foreach { i =>
assert(bitSet.get(i))
}
(setUntil until bitSet.capacity).foreach { i =>
assert(!bitSet.get(i))
}
}
}

View file

@ -954,12 +954,12 @@ private class SortMergeFullOuterJoinScanner(
}
if (leftMatches.size <= leftMatched.capacity) {
leftMatched.clear()
leftMatched.clearUntil(leftMatches.size)
} else {
leftMatched = new BitSet(leftMatches.size)
}
if (rightMatches.size <= rightMatched.capacity) {
rightMatched.clear()
rightMatched.clearUntil(rightMatches.size)
} else {
rightMatched = new BitSet(rightMatches.size)
}