[SPARK-35757][CORE] Add bitwise AND operation and functionality for intersecting bloom filters

### What changes were proposed in this pull request?
This change is for [SPARK-35757](https://issues.apache.org/jira/browse/SPARK-35757) and does the following:

1. adds bitwise AND operation to BitArray (similar to existing `putAll` method)
2. adds an intersect operation for combining bloom filters using bitwise AND operation (similar to existing `mergeInPlace` method).

### Why are the changes needed?
The current bloom filter library only allows combining two bloom filters using OR operation. It is useful to have AND operation as well.

### Does this PR introduce _any_ user-facing change?
No, just adds new methods.

### How was this patch tested?
Just the existing tests.

Closes #32907 from kudhru/master.

Lead-authored-by: kudhru <gargdhruv36@gmail.com>
Co-authored-by: Dhruv Kumar <kudhru@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
kudhru 2021-06-17 06:29:33 +00:00 committed by Wenchen Fan
parent e0d81d9b71
commit 8aeed08d04
5 changed files with 72 additions and 4 deletions

View file

@ -85,6 +85,17 @@ final class BitArray {
this.bitCount = bitCount; this.bitCount = bitCount;
} }
/** Combines the two BitArrays using bitwise AND. */
void and(BitArray array) {
assert data.length == array.data.length : "BitArrays must be of equal length when merging";
long bitCount = 0;
for (int i = 0; i < data.length; i++) {
data[i] &= array.data[i];
bitCount += Long.bitCount(data[i]);
}
this.bitCount = bitCount;
}
void writeTo(DataOutputStream out) throws IOException { void writeTo(DataOutputStream out) throws IOException {
out.writeInt(data.length); out.writeInt(data.length);
for (long datum : data) { for (long datum : data) {

View file

@ -126,6 +126,16 @@ public abstract class BloomFilter {
*/ */
public abstract BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException; public abstract BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException;
/**
* Combines this bloom filter with another bloom filter by performing a bitwise AND of the
* underlying data. The mutations happen to <b>this</b> instance. Callers must ensure the
* bloom filters are appropriately sized to avoid saturating them.
*
* @param other The bloom filter to combine this bloom filter with. It is not mutated.
* @throws IncompatibleMergeException if {@code isCompatible(other) == false}
*/
public abstract BloomFilter intersectInPlace(BloomFilter other) throws IncompatibleMergeException;
/** /**
* Returns {@code true} if the element <i>might</i> have been put in this Bloom filter, * Returns {@code true} if the element <i>might</i> have been put in this Bloom filter,
* {@code false} if this is <i>definitely</i> not the case. * {@code false} if this is <i>definitely</i> not the case.

View file

@ -193,6 +193,22 @@ class BloomFilterImpl extends BloomFilter implements Serializable {
@Override @Override
public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException { public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException {
BloomFilterImpl otherImplInstance = checkCompatibilityForMerge(other);
this.bits.putAll(otherImplInstance.bits);
return this;
}
@Override
public BloomFilter intersectInPlace(BloomFilter other) throws IncompatibleMergeException {
BloomFilterImpl otherImplInstance = checkCompatibilityForMerge(other);
this.bits.and(otherImplInstance.bits);
return this;
}
private BloomFilterImpl checkCompatibilityForMerge(BloomFilter other)
throws IncompatibleMergeException {
// Duplicates the logic of `isCompatible` here to provide better error message. // Duplicates the logic of `isCompatible` here to provide better error message.
if (other == null) { if (other == null) {
throw new IncompatibleMergeException("Cannot merge null bloom filter"); throw new IncompatibleMergeException("Cannot merge null bloom filter");
@ -215,9 +231,7 @@ class BloomFilterImpl extends BloomFilter implements Serializable {
"Cannot merge bloom filters with different number of hash functions" "Cannot merge bloom filters with different number of hash functions"
); );
} }
return that;
this.bits.putAll(that.bits);
return this;
} }
@Override @Override

View file

@ -99,9 +99,39 @@ class BloomFilterSuite extends AnyFunSuite { // scalastyle:ignore funsuite
} }
} }
def testIntersectInPlace[T: ClassTag]
(typeName: String, numItems: Int)(itemGen: Random => T): Unit = {
test(s"intersectInPlace - $typeName") {
// use a fixed seed to make the test predictable.
val r = new Random(37)
val items1 = Array.fill(numItems / 2)(itemGen(r))
val items2 = Array.fill(numItems / 2)(itemGen(r))
val filter1 = BloomFilter.create(numItems / 2)
items1.foreach(filter1.put)
val filter2 = BloomFilter.create(numItems / 2)
items2.foreach(filter2.put)
filter1.intersectInPlace(filter2)
val common_items = items1.intersect(items2)
common_items.foreach(i => assert(filter1.mightContain(i)))
// After intersect, `filter1` still has `numItems/2` items
// which doesn't exceed `expectedNumItems`,
// so the `expectedFpp` should not be higher than the default one.
assert(filter1.expectedFpp() - BloomFilter.DEFAULT_FPP < EPSILON)
checkSerDe(filter1)
}
}
def testItemType[T: ClassTag](typeName: String, numItems: Int)(itemGen: Random => T): Unit = { def testItemType[T: ClassTag](typeName: String, numItems: Int)(itemGen: Random => T): Unit = {
testAccuracy[T](typeName, numItems)(itemGen) testAccuracy[T](typeName, numItems)(itemGen)
testMergeInPlace[T](typeName, numItems)(itemGen) testMergeInPlace[T](typeName, numItems)(itemGen)
testIntersectInPlace[T](typeName, numItems)(itemGen)
} }
testItemType[Byte]("Byte", 160) { _.nextInt().toByte } testItemType[Byte]("Byte", 160) { _.nextInt().toByte }

View file

@ -58,7 +58,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild"), ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild"),
// [SPARK-35135][CORE] Turn WritablePartitionedIterator from trait into a default implementation class // [SPARK-35135][CORE] Turn WritablePartitionedIterator from trait into a default implementation class
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.WritablePartitionedIterator") ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.WritablePartitionedIterator"),
// [SPARK-35757][CORE] Add bitwise AND operation and functionality for intersecting bloom filters
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.BloomFilter.intersectInPlace")
) )
// Exclude rules for 3.1.x // Exclude rules for 3.1.x