[SPARK-13325][SQL] Create a 64-bit hashcode expression

This PR introduces a 64-bit hashcode expression. Such an expression is especially usefull for HyperLogLog++ and other probabilistic datastructures.

I have implemented xxHash64 which is a 64-bit hashing algorithm created by Yann Colet and Mathias Westerdahl. This is a high speed (C implementation runs at memory bandwidth) and high quality hashcode. It exploits both Instruction Level Parralellism (for speed) and the multiplication and rotation techniques (for quality) like MurMurHash does.

The initial results are promising. I have added a CG'ed test to the `HashBenchmark`, and this results in the following results (running from SBT):

    Running benchmark: Hash For simple
      Running case: interpreted version
      Running case: codegen version
      Running case: codegen version 64-bit

    Intel(R) Core(TM) i7-4750HQ CPU  2.00GHz
    Hash For simple:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    interpreted version                      1011 / 1016        132.8           7.5       1.0X
    codegen version                          1864 / 1869         72.0          13.9       0.5X
    codegen version 64-bit                   1614 / 1644         83.2          12.0       0.6X

    Running benchmark: Hash For normal
      Running case: interpreted version
      Running case: codegen version
      Running case: codegen version 64-bit

    Intel(R) Core(TM) i7-4750HQ CPU  2.00GHz
    Hash For normal:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    interpreted version                      2467 / 2475          0.9        1176.1       1.0X
    codegen version                          2008 / 2115          1.0         957.5       1.2X
    codegen version 64-bit                    728 /  758          2.9         347.0       3.4X

    Running benchmark: Hash For array
      Running case: interpreted version
      Running case: codegen version
      Running case: codegen version 64-bit

    Intel(R) Core(TM) i7-4750HQ CPU  2.00GHz
    Hash For array:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    interpreted version                      1544 / 1707          0.1       11779.6       1.0X
    codegen version                          2728 / 2745          0.0       20815.5       0.6X
    codegen version 64-bit                   2508 / 2549          0.1       19132.8       0.6X

    Running benchmark: Hash For map
      Running case: interpreted version
      Running case: codegen version
      Running case: codegen version 64-bit

    Intel(R) Core(TM) i7-4750HQ CPU  2.00GHz
    Hash For map:                       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    interpreted version                      1819 / 1826          0.0      444014.3       1.0X
    codegen version                           183 /  194          0.0       44642.9       9.9X
    codegen version 64-bit                    173 /  174          0.0       42120.9      10.5X

This shows that algorithm is consistently faster than MurMurHash32 in all cases and up to 3x (!) in the normal case.

I have also added this to HyperLogLog++ and it cuts the processing time of the following code in half:

    val df = sqlContext.range(1<<25).agg(approxCountDistinct("id"))
    df.explain()
    val t = System.nanoTime()
    df.show()
    val ns = System.nanoTime() - t

    // Before
    ns: Long = 5821524302

    // After
    ns: Long = 2836418963

cc cloud-fan (you have been working on hashcodes) / rxin

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #11209 from hvanhovell/xxHash.
This commit is contained in:
Herman van Hovell 2016-03-23 20:51:01 +01:00
parent 8c826880f5
commit 919bf32198
7 changed files with 713 additions and 110 deletions

View file

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.SystemClock;
// scalastyle: off
/**
* xxHash64. A high quality and fast 64 bit hash code by Yann Colet and Mathias Westerdahl. The
* class below is modelled like its Murmur3_x86_32 cousin.
* <p/>
* This was largely based on the following (original) C and Java implementations:
* https://github.com/Cyan4973/xxHash/blob/master/xxhash.c
* https://github.com/OpenHFT/Zero-Allocation-Hashing/blob/master/src/main/java/net/openhft/hashing/XxHash_r39.java
* https://github.com/airlift/slice/blob/master/src/main/java/io/airlift/slice/XxHash64.java
*/
// scalastyle: on
public final class XXH64 {
private static final long PRIME64_1 = 0x9E3779B185EBCA87L;
private static final long PRIME64_2 = 0xC2B2AE3D27D4EB4FL;
private static final long PRIME64_3 = 0x165667B19E3779F9L;
private static final long PRIME64_4 = 0x85EBCA77C2B2AE63L;
private static final long PRIME64_5 = 0x27D4EB2F165667C5L;
private final long seed;
public XXH64(long seed) {
super();
this.seed = seed;
}
@Override
public String toString() {
return "xxHash64(seed=" + seed + ")";
}
public long hashInt(int input) {
return hashInt(input, seed);
}
public static long hashInt(int input, long seed) {
long hash = seed + PRIME64_5 + 4L;
hash ^= (input & 0xFFFFFFFFL) * PRIME64_1;
hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
return fmix(hash);
}
public long hashLong(long input) {
return hashLong(input, seed);
}
public static long hashLong(long input, long seed) {
long hash = seed + PRIME64_5 + 8L;
hash ^= Long.rotateLeft(input * PRIME64_2, 31) * PRIME64_1;
hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
return fmix(hash);
}
public long hashUnsafeWords(Object base, long offset, int length) {
return hashUnsafeWords(base, offset, length, seed);
}
public static long hashUnsafeWords(Object base, long offset, int length, long seed) {
assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
long hash = hashBytesByWords(base, offset, length, seed);
return fmix(hash);
}
public long hashUnsafeBytes(Object base, long offset, int length) {
return hashUnsafeBytes(base, offset, length, seed);
}
public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
assert (length >= 0) : "lengthInBytes cannot be negative";
long hash = hashBytesByWords(base, offset, length, seed);
long end = offset + length;
offset += length & -8;
if (offset + 4L <= end) {
hash ^= (Platform.getInt(base, offset) & 0xFFFFFFFFL) * PRIME64_1;
hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
offset += 4L;
}
while (offset < end) {
hash ^= (Platform.getByte(base, offset) & 0xFFL) * PRIME64_5;
hash = Long.rotateLeft(hash, 11) * PRIME64_1;
offset++;
}
return fmix(hash);
}
private static long fmix(long hash) {
hash ^= hash >>> 33;
hash *= PRIME64_2;
hash ^= hash >>> 29;
hash *= PRIME64_3;
hash ^= hash >>> 32;
return hash;
}
private static long hashBytesByWords(Object base, long offset, int length, long seed) {
long end = offset + length;
long hash;
if (length >= 32) {
long limit = end - 32;
long v1 = seed + PRIME64_1 + PRIME64_2;
long v2 = seed + PRIME64_2;
long v3 = seed;
long v4 = seed - PRIME64_1;
do {
v1 += Platform.getLong(base, offset) * PRIME64_2;
v1 = Long.rotateLeft(v1, 31);
v1 *= PRIME64_1;
v2 += Platform.getLong(base, offset + 8) * PRIME64_2;
v2 = Long.rotateLeft(v2, 31);
v2 *= PRIME64_1;
v3 += Platform.getLong(base, offset + 16) * PRIME64_2;
v3 = Long.rotateLeft(v3, 31);
v3 *= PRIME64_1;
v4 += Platform.getLong(base, offset + 24) * PRIME64_2;
v4 = Long.rotateLeft(v4, 31);
v4 *= PRIME64_1;
offset += 32L;
} while (offset <= limit);
hash = Long.rotateLeft(v1, 1)
+ Long.rotateLeft(v2, 7)
+ Long.rotateLeft(v3, 12)
+ Long.rotateLeft(v4, 18);
v1 *= PRIME64_2;
v1 = Long.rotateLeft(v1, 31);
v1 *= PRIME64_1;
hash ^= v1;
hash = hash * PRIME64_1 + PRIME64_4;
v2 *= PRIME64_2;
v2 = Long.rotateLeft(v2, 31);
v2 *= PRIME64_1;
hash ^= v2;
hash = hash * PRIME64_1 + PRIME64_4;
v3 *= PRIME64_2;
v3 = Long.rotateLeft(v3, 31);
v3 *= PRIME64_1;
hash ^= v3;
hash = hash * PRIME64_1 + PRIME64_4;
v4 *= PRIME64_2;
v4 = Long.rotateLeft(v4, 31);
v4 *= PRIME64_1;
hash ^= v4;
hash = hash * PRIME64_1 + PRIME64_4;
} else {
hash = seed + PRIME64_5;
}
hash += length;
long limit = end - 8;
while (offset <= limit) {
long k1 = Platform.getLong(base, offset);
hash ^= Long.rotateLeft(k1 * PRIME64_2, 31) * PRIME64_1;
hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
offset += 8L;
}
return hash;
}
}

View file

@ -169,7 +169,7 @@ case class HyperLogLogPlusPlus(
val v = child.eval(input) val v = child.eval(input)
if (v != null) { if (v != null) {
// Create the hashed value 'x'. // Create the hashed value 'x'.
val x = MurmurHash.hash64(v) val x = XxHash64Function.hash(v, child.dataType, 42L)
// Determine the index of the register we are going to use. // Determine the index of the register we are going to use.
val idx = (x >>> idxShift).toInt val idx = (x >>> idxShift).toInt

View file

@ -185,6 +185,7 @@ case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInp
} }
} }
/** /**
* A function that calculates hash value for a group of expressions. Note that the `seed` argument * A function that calculates hash value for a group of expressions. Note that the `seed` argument
* is not exposed to users and should only be set inside spark SQL. * is not exposed to users and should only be set inside spark SQL.
@ -213,14 +214,10 @@ case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInp
* `result`. * `result`.
* *
* Finally we aggregate the hash values for each expression by the same way of struct. * Finally we aggregate the hash values for each expression by the same way of struct.
*
* We should use this hash function for both shuffle and bucket, so that we can guarantee shuffle
* and bucketing have same data distribution.
*/ */
case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression { abstract class HashExpression[E] extends Expression {
def this(arguments: Seq[Expression]) = this(arguments, 42) /** Seed of the HashExpression. */
val seed: E
override def dataType: DataType = IntegerType
override def foldable: Boolean = children.forall(_.foldable) override def foldable: Boolean = children.forall(_.foldable)
@ -234,8 +231,6 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
} }
} }
override def prettyName: String = "hash"
override def eval(input: InternalRow): Any = { override def eval(input: InternalRow): Any = {
var hash = seed var hash = seed
var i = 0 var i = 0
@ -247,80 +242,7 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
hash hash
} }
private def computeHash(value: Any, dataType: DataType, seed: Int): Int = { protected def computeHash(value: Any, dataType: DataType, seed: E): E
def hashInt(i: Int): Int = Murmur3_x86_32.hashInt(i, seed)
def hashLong(l: Long): Int = Murmur3_x86_32.hashLong(l, seed)
value match {
case null => seed
case b: Boolean => hashInt(if (b) 1 else 0)
case b: Byte => hashInt(b)
case s: Short => hashInt(s)
case i: Int => hashInt(i)
case l: Long => hashLong(l)
case f: Float => hashInt(java.lang.Float.floatToIntBits(f))
case d: Double => hashLong(java.lang.Double.doubleToLongBits(d))
case d: Decimal =>
val precision = dataType.asInstanceOf[DecimalType].precision
if (precision <= Decimal.MAX_LONG_DIGITS) {
hashLong(d.toUnscaledLong)
} else {
val bytes = d.toJavaBigDecimal.unscaledValue().toByteArray
Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, seed)
}
case c: CalendarInterval => Murmur3_x86_32.hashInt(c.months, hashLong(c.microseconds))
case a: Array[Byte] =>
Murmur3_x86_32.hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
case s: UTF8String =>
Murmur3_x86_32.hashUnsafeBytes(s.getBaseObject, s.getBaseOffset, s.numBytes(), seed)
case array: ArrayData =>
val elementType = dataType match {
case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType
case ArrayType(et, _) => et
}
var result = seed
var i = 0
while (i < array.numElements()) {
result = computeHash(array.get(i, elementType), elementType, result)
i += 1
}
result
case map: MapData =>
val (kt, vt) = dataType match {
case udt: UserDefinedType[_] =>
val mapType = udt.sqlType.asInstanceOf[MapType]
mapType.keyType -> mapType.valueType
case MapType(kt, vt, _) => kt -> vt
}
val keys = map.keyArray()
val values = map.valueArray()
var result = seed
var i = 0
while (i < map.numElements()) {
result = computeHash(keys.get(i, kt), kt, result)
result = computeHash(values.get(i, vt), vt, result)
i += 1
}
result
case struct: InternalRow =>
val types: Array[DataType] = dataType match {
case udt: UserDefinedType[_] =>
udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray
case StructType(fields) => fields.map(_.dataType)
}
var result = seed
var i = 0
val len = struct.numFields
while (i < len) {
result = computeHash(struct.get(i, types(i)), types(i), result)
i += 1
}
result
}
}
override def genCode(ctx: CodegenContext, ev: ExprCode): String = { override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
ev.isNull = "false" ev.isNull = "false"
@ -332,7 +254,7 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
}.mkString("\n") }.mkString("\n")
s""" s"""
int ${ev.value} = $seed; ${ctx.javaType(dataType)} ${ev.value} = $seed;
$childrenHash $childrenHash
""" """
} }
@ -360,7 +282,7 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
dataType: DataType, dataType: DataType,
result: String, result: String,
ctx: CodegenContext): String = { ctx: CodegenContext): String = {
val hasher = classOf[Murmur3_x86_32].getName val hasher = hasherClassName
def hashInt(i: String): String = s"$result = $hasher.hashInt($i, $result);" def hashInt(i: String): String = s"$result = $hasher.hashInt($i, $result);"
def hashLong(l: String): String = s"$result = $hasher.hashLong($l, $result);" def hashLong(l: String): String = s"$result = $hasher.hashLong($l, $result);"
@ -423,6 +345,125 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx) case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx)
} }
} }
protected def hasherClassName: String
}
/**
* Base class for interpreted hash functions.
*/
abstract class InterpretedHashFunction {
protected def hashInt(i: Int, seed: Long): Long
protected def hashLong(l: Long, seed: Long): Long
protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long
def hash(value: Any, dataType: DataType, seed: Long): Long = {
value match {
case null => seed
case b: Boolean => hashInt(if (b) 1 else 0, seed)
case b: Byte => hashInt(b, seed)
case s: Short => hashInt(s, seed)
case i: Int => hashInt(i, seed)
case l: Long => hashLong(l, seed)
case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed)
case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
case d: Decimal =>
val precision = dataType.asInstanceOf[DecimalType].precision
if (precision <= Decimal.MAX_LONG_DIGITS) {
hashLong(d.toUnscaledLong, seed)
} else {
val bytes = d.toJavaBigDecimal.unscaledValue().toByteArray
hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, seed)
}
case c: CalendarInterval => hashInt(c.months, hashLong(c.microseconds, seed))
case a: Array[Byte] =>
hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
case s: UTF8String =>
hashUnsafeBytes(s.getBaseObject, s.getBaseOffset, s.numBytes(), seed)
case array: ArrayData =>
val elementType = dataType match {
case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType
case ArrayType(et, _) => et
}
var result = seed
var i = 0
while (i < array.numElements()) {
result = hash(array.get(i, elementType), elementType, result)
i += 1
}
result
case map: MapData =>
val (kt, vt) = dataType match {
case udt: UserDefinedType[_] =>
val mapType = udt.sqlType.asInstanceOf[MapType]
mapType.keyType -> mapType.valueType
case MapType(kt, vt, _) => kt -> vt
}
val keys = map.keyArray()
val values = map.valueArray()
var result = seed
var i = 0
while (i < map.numElements()) {
result = hash(keys.get(i, kt), kt, result)
result = hash(values.get(i, vt), vt, result)
i += 1
}
result
case struct: InternalRow =>
val types: Array[DataType] = dataType match {
case udt: UserDefinedType[_] =>
udt.sqlType.asInstanceOf[StructType].map(_.dataType).toArray
case StructType(fields) => fields.map(_.dataType)
}
var result = seed
var i = 0
val len = struct.numFields
while (i < len) {
result = hash(struct.get(i, types(i)), types(i), result)
i += 1
}
result
}
}
}
/**
* A MurMur3 Hash expression.
*
* We should use this hash function for both shuffle and bucket, so that we can guarantee shuffle
* and bucketing have same data distribution.
*/
case class Murmur3Hash(children: Seq[Expression], seed: Int) extends HashExpression[Int] {
def this(arguments: Seq[Expression]) = this(arguments, 42)
override def dataType: DataType = IntegerType
override def prettyName: String = "hash"
override protected def hasherClassName: String = classOf[Murmur3_x86_32].getName
override protected def computeHash(value: Any, dataType: DataType, seed: Int): Int = {
Murmur3HashFunction.hash(value, dataType, seed).toInt
}
}
object Murmur3HashFunction extends InterpretedHashFunction {
override protected def hashInt(i: Int, seed: Long): Long = {
Murmur3_x86_32.hashInt(i, seed.toInt)
}
override protected def hashLong(l: Long, seed: Long): Long = {
Murmur3_x86_32.hashLong(l, seed.toInt)
}
override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
Murmur3_x86_32.hashUnsafeBytes(base, offset, len, seed.toInt)
}
} }
/** /**
@ -442,3 +483,30 @@ case class PrintToStderr(child: Expression) extends UnaryExpression {
""".stripMargin) """.stripMargin)
} }
} }
/**
* A xxHash64 64-bit hash expression.
*/
case class XxHash64(children: Seq[Expression], seed: Long) extends HashExpression[Long] {
def this(arguments: Seq[Expression]) = this(arguments, 42L)
override def dataType: DataType = LongType
override def prettyName: String = "xxHash"
override protected def hasherClassName: String = classOf[XXH64].getName
override protected def computeHash(value: Any, dataType: DataType, seed: Long): Long = {
XxHash64Function.hash(value, dataType, seed)
}
}
object XxHash64Function extends InterpretedHashFunction {
override protected def hashInt(i: Int, seed: Long): Long = XXH64.hashInt(i, seed)
override protected def hashLong(l: Long, seed: Long): Long = XXH64.hashLong(l, seed)
override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
XXH64.hashUnsafeBytes(base, offset, len, seed)
}
}

View file

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.apache.spark.unsafe.Platform;
import org.junit.Assert;
import org.junit.Test;
/**
* Test the XXH64 function.
* <p/>
* Test constants were taken from the original implementation and the airlift/slice implementation.
*/
public class XXH64Suite {
private static final XXH64 hasher = new XXH64(0);
private static final int SIZE = 101;
private static final long PRIME = 2654435761L;
private static final byte[] BUFFER = new byte[SIZE];
private static final int TEST_INT = 0x4B1FFF9E; // First 4 bytes in the buffer
private static final long TEST_LONG = 0xDD2F535E4B1FFF9EL; // First 8 bytes in the buffer
/* Create the test data. */
static {
long seed = PRIME;
for (int i = 0; i < SIZE; i++) {
BUFFER[i] = (byte) (seed >> 24);
seed *= seed;
}
}
@Test
public void testKnownIntegerInputs() {
Assert.assertEquals(0x9256E58AA397AEF1L, hasher.hashInt(TEST_INT));
Assert.assertEquals(0x9D5FFDFB928AB4BL, XXH64.hashInt(TEST_INT, PRIME));
}
@Test
public void testKnownLongInputs() {
Assert.assertEquals(0xF74CB1451B32B8CFL, hasher.hashLong(TEST_LONG));
Assert.assertEquals(0x9C44B77FBCC302C5L, XXH64.hashLong(TEST_LONG, PRIME));
}
@Test
public void testKnownByteArrayInputs() {
Assert.assertEquals(0xEF46DB3751D8E999L,
hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 0));
Assert.assertEquals(0xAC75FDA2929B17EFL,
XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 0, PRIME));
Assert.assertEquals(0x4FCE394CC88952D8L,
hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 1));
Assert.assertEquals(0x739840CB819FA723L,
XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 1, PRIME));
// These tests currently fail in a big endian environment because the test data and expected
// answers are generated with little endian the assumptions. We could revisit this when Platform
// becomes endian aware.
if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
Assert.assertEquals(0x9256E58AA397AEF1L,
hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 4));
Assert.assertEquals(0x9D5FFDFB928AB4BL,
XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 4, PRIME));
Assert.assertEquals(0xF74CB1451B32B8CFL,
hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 8));
Assert.assertEquals(0x9C44B77FBCC302C5L,
XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 8, PRIME));
Assert.assertEquals(0xCFFA8DB881BC3A3DL,
hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 14));
Assert.assertEquals(0x5B9611585EFCC9CBL,
XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, 14, PRIME));
Assert.assertEquals(0x0EAB543384F878ADL,
hasher.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, SIZE));
Assert.assertEquals(0xCAA65939306F1E21L,
XXH64.hashUnsafeBytes(BUFFER, Platform.BYTE_ARRAY_OFFSET, SIZE, PRIME));
}
}
@Test
public void randomizedStressTest() {
int size = 65536;
Random rand = new Random();
// A set used to track collision rate.
Set<Long> hashcodes = new HashSet<>();
for (int i = 0; i < size; i++) {
int vint = rand.nextInt();
long lint = rand.nextLong();
Assert.assertEquals(hasher.hashInt(vint), hasher.hashInt(vint));
Assert.assertEquals(hasher.hashLong(lint), hasher.hashLong(lint));
hashcodes.add(hasher.hashLong(lint));
}
// A very loose bound.
Assert.assertTrue(hashcodes.size() > size * 0.95d);
}
@Test
public void randomizedStressTestBytes() {
int size = 65536;
Random rand = new Random();
// A set used to track collision rate.
Set<Long> hashcodes = new HashSet<>();
for (int i = 0; i < size; i++) {
int byteArrSize = rand.nextInt(100) * 8;
byte[] bytes = new byte[byteArrSize];
rand.nextBytes(bytes);
Assert.assertEquals(
hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
hashcodes.add(hasher.hashUnsafeWords(
bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
}
// A very loose bound.
Assert.assertTrue(hashcodes.size() > size * 0.95d);
}
@Test
public void randomizedStressTestPaddedStrings() {
int size = 64000;
// A set used to track collision rate.
Set<Long> hashcodes = new HashSet<>();
for (int i = 0; i < size; i++) {
int byteArrSize = 8;
byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
byte[] paddedBytes = new byte[byteArrSize];
System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
Assert.assertEquals(
hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
hashcodes.add(hasher.hashUnsafeWords(
paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
}
// A very loose bound.
Assert.assertTrue(hashcodes.size() > size * 0.95d);
}
}

View file

@ -18,14 +18,14 @@
package org.apache.spark.sql package org.apache.spark.sql
import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Murmur3Hash, UnsafeProjection} import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.util.Benchmark import org.apache.spark.util.Benchmark
/** /**
* Benchmark for the previous interpreted hash function(InternalRow.hashCode) vs the new codegen * Benchmark for the previous interpreted hash function(InternalRow.hashCode) vs codegened
* hash expression(Murmur3Hash). * hash expressions (Murmur3Hash/xxHash64).
*/ */
object HashBenchmark { object HashBenchmark {
@ -63,19 +63,44 @@ object HashBenchmark {
} }
} }
} }
val getHashCode64b = UnsafeProjection.create(new XxHash64(attrs) :: Nil, attrs)
benchmark.addCase("codegen version 64-bit") { _: Int =>
for (_ <- 0L until iters) {
var sum = 0
var i = 0
while (i < numRows) {
sum += getHashCode64b(rows(i)).getInt(0)
i += 1
}
}
}
benchmark.run() benchmark.run()
} }
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val simple = new StructType().add("i", IntegerType) val singleInt = new StructType().add("i", IntegerType)
/* /*
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash For simple: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative Hash For single ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------
interpreted version 941 / 955 142.6 7.0 1.0X interpreted version 1006 / 1011 133.4 7.5 1.0X
codegen version 1737 / 1775 77.3 12.9 0.5X codegen version 1835 / 1839 73.1 13.7 0.5X
codegen version 64-bit 1627 / 1628 82.5 12.1 0.6X
*/ */
test("simple", simple, 1 << 13, 1 << 14) test("single ints", singleInt, 1 << 15, 1 << 14)
val singleLong = new StructType().add("i", LongType)
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash For single longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
interpreted version 1196 / 1209 112.2 8.9 1.0X
codegen version 2178 / 2181 61.6 16.2 0.5X
codegen version 64-bit 1752 / 1753 76.6 13.1 0.7X
*/
test("single longs", singleLong, 1 << 15, 1 << 14)
val normal = new StructType() val normal = new StructType()
.add("null", NullType) .add("null", NullType)
@ -93,11 +118,12 @@ object HashBenchmark {
.add("date", DateType) .add("date", DateType)
.add("timestamp", TimestampType) .add("timestamp", TimestampType)
/* /*
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------
interpreted version 2209 / 2271 0.9 1053.4 1.0X interpreted version 2713 / 2715 0.8 1293.5 1.0X
codegen version 1887 / 2018 1.1 899.9 1.2X codegen version 2015 / 2018 1.0 960.9 1.3X
codegen version 64-bit 735 / 738 2.9 350.7 3.7X
*/ */
test("normal", normal, 1 << 10, 1 << 11) test("normal", normal, 1 << 10, 1 << 11)
@ -106,11 +132,12 @@ object HashBenchmark {
.add("array", arrayOfInt) .add("array", arrayOfInt)
.add("arrayOfArray", ArrayType(arrayOfInt)) .add("arrayOfArray", ArrayType(arrayOfInt))
/* /*
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------
interpreted version 1481 / 1529 0.1 11301.7 1.0X interpreted version 1498 / 1499 0.1 11432.1 1.0X
codegen version 2591 / 2636 0.1 19771.1 0.6X codegen version 2642 / 2643 0.0 20158.4 0.6X
codegen version 64-bit 2421 / 2424 0.1 18472.5 0.6X
*/ */
test("array", array, 1 << 8, 1 << 9) test("array", array, 1 << 8, 1 << 9)
@ -119,11 +146,12 @@ object HashBenchmark {
.add("map", mapOfInt) .add("map", mapOfInt)
.add("mapOfMap", MapType(IntegerType, mapOfInt)) .add("mapOfMap", MapType(IntegerType, mapOfInt))
/* /*
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------
interpreted version 1820 / 1861 0.0 444347.2 1.0X interpreted version 1612 / 1618 0.0 393553.4 1.0X
codegen version 205 / 223 0.0 49936.5 8.9X codegen version 149 / 150 0.0 36381.2 10.8X
codegen version 64-bit 144 / 145 0.0 35122.1 11.2X
*/ */
test("map", map, 1 << 6, 1 << 6) test("map", map, 1 << 6, 1 << 6)
} }

View file

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.util.Random
import org.apache.spark.sql.catalyst.expressions.XXH64
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
import org.apache.spark.util.Benchmark
/**
* Synthetic benchmark for MurMurHash 3 and xxHash64.
*/
object HashByteArrayBenchmark {
def test(length: Int, seed: Long, numArrays: Int, iters: Int): Unit = {
val random = new Random(seed)
val arrays = Array.fill[Array[Byte]](numArrays) {
val bytes = new Array[Byte](length)
random.nextBytes(bytes)
bytes
}
val benchmark = new Benchmark("Hash byte arrays with length " + length, iters * numArrays)
benchmark.addCase("Murmur3_x86_32") { _: Int =>
for (_ <- 0L until iters) {
var sum = 0
var i = 0
while (i < numArrays) {
sum += Murmur3_x86_32.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42)
i += 1
}
}
}
benchmark.addCase("xxHash 64-bit") { _: Int =>
for (_ <- 0L until iters) {
var sum = 0L
var i = 0
while (i < numArrays) {
sum += XXH64.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42)
i += 1
}
}
}
benchmark.run()
}
def main(args: Array[String]): Unit = {
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash byte arrays with length 8: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Murmur3_x86_32 11 / 12 185.1 5.4 1.0X
xxHash 64-bit 17 / 18 120.0 8.3 0.6X
*/
test(8, 42L, 1 << 10, 1 << 11)
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash byte arrays with length 16: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Murmur3_x86_32 18 / 18 118.6 8.4 1.0X
xxHash 64-bit 20 / 21 102.5 9.8 0.9X
*/
test(16, 42L, 1 << 10, 1 << 11)
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash byte arrays with length 24: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Murmur3_x86_32 24 / 24 86.6 11.5 1.0X
xxHash 64-bit 23 / 23 93.2 10.7 1.1X
*/
test(24, 42L, 1 << 10, 1 << 11)
// Add 31 to all arrays to create worse case alignment for xxHash.
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash byte arrays with length 31: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Murmur3_x86_32 38 / 39 54.7 18.3 1.0X
xxHash 64-bit 33 / 33 64.4 15.5 1.2X
*/
test(31, 42L, 1 << 10, 1 << 11)
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash byte arrays with length 95: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Murmur3_x86_32 91 / 94 22.9 43.6 1.0X
xxHash 64-bit 68 / 69 30.6 32.7 1.3X
*/
test(64 + 31, 42L, 1 << 10, 1 << 11)
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash byte arrays with length 287: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Murmur3_x86_32 268 / 268 7.8 127.6 1.0X
xxHash 64-bit 108 / 109 19.4 51.6 2.5X
*/
test(256 + 31, 42L, 1 << 10, 1 << 11)
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash byte arrays with length 1055: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Murmur3_x86_32 942 / 945 2.2 449.4 1.0X
xxHash 64-bit 276 / 276 7.6 131.4 3.4X
*/
test(1024 + 31, 42L, 1 << 10, 1 << 11)
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash byte arrays with length 2079: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Murmur3_x86_32 1839 / 1843 1.1 876.8 1.0X
xxHash 64-bit 445 / 448 4.7 212.1 4.1X
*/
test(2048 + 31, 42L, 1 << 10, 1 << 11)
/*
Intel(R) Core(TM) i7-4750HQ CPU @ 2.00GHz
Hash byte arrays with length 8223: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Murmur3_x86_32 7307 / 7310 0.3 3484.4 1.0X
xxHash 64-bit 1487 / 1488 1.4 709.1 4.9X
*/
test(8192 + 31, 42L, 1 << 10, 1 << 11)
}
}

View file

@ -76,7 +76,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
private val mapOfString = MapType(StringType, StringType) private val mapOfString = MapType(StringType, StringType)
private val arrayOfUDT = ArrayType(new ExamplePointUDT, false) private val arrayOfUDT = ArrayType(new ExamplePointUDT, false)
testMurmur3Hash( testHash(
new StructType() new StructType()
.add("null", NullType) .add("null", NullType)
.add("boolean", BooleanType) .add("boolean", BooleanType)
@ -94,7 +94,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
.add("timestamp", TimestampType) .add("timestamp", TimestampType)
.add("udt", new ExamplePointUDT)) .add("udt", new ExamplePointUDT))
testMurmur3Hash( testHash(
new StructType() new StructType()
.add("arrayOfNull", arrayOfNull) .add("arrayOfNull", arrayOfNull)
.add("arrayOfString", arrayOfString) .add("arrayOfString", arrayOfString)
@ -104,7 +104,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
.add("arrayOfStruct", ArrayType(structOfString)) .add("arrayOfStruct", ArrayType(structOfString))
.add("arrayOfUDT", arrayOfUDT)) .add("arrayOfUDT", arrayOfUDT))
testMurmur3Hash( testHash(
new StructType() new StructType()
.add("mapOfIntAndString", MapType(IntegerType, StringType)) .add("mapOfIntAndString", MapType(IntegerType, StringType))
.add("mapOfStringAndArray", MapType(StringType, arrayOfString)) .add("mapOfStringAndArray", MapType(StringType, arrayOfString))
@ -114,7 +114,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
.add("mapOfStructAndString", MapType(structOfString, StringType)) .add("mapOfStructAndString", MapType(structOfString, StringType))
.add("mapOfStruct", MapType(structOfString, structOfString))) .add("mapOfStruct", MapType(structOfString, structOfString)))
testMurmur3Hash( testHash(
new StructType() new StructType()
.add("structOfString", structOfString) .add("structOfString", structOfString)
.add("structOfStructOfString", new StructType().add("struct", structOfString)) .add("structOfStructOfString", new StructType().add("struct", structOfString))
@ -124,11 +124,11 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
new StructType().add("array", arrayOfString).add("map", mapOfString)) new StructType().add("array", arrayOfString).add("map", mapOfString))
.add("structOfUDT", structOfUDT)) .add("structOfUDT", structOfUDT))
private def testMurmur3Hash(inputSchema: StructType): Unit = { private def testHash(inputSchema: StructType): Unit = {
val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get
val encoder = RowEncoder(inputSchema) val encoder = RowEncoder(inputSchema)
val seed = scala.util.Random.nextInt() val seed = scala.util.Random.nextInt()
test(s"murmur3 hash: ${inputSchema.simpleString}") { test(s"murmur3/xxHash64 hash: ${inputSchema.simpleString}") {
for (_ <- 1 to 10) { for (_ <- 1 to 10) {
val input = encoder.toRow(inputGenerator.apply().asInstanceOf[Row]).asInstanceOf[UnsafeRow] val input = encoder.toRow(inputGenerator.apply().asInstanceOf[Row]).asInstanceOf[UnsafeRow]
val literals = input.toSeq(inputSchema).zip(inputSchema.map(_.dataType)).map { val literals = input.toSeq(inputSchema).zip(inputSchema.map(_.dataType)).map {
@ -136,6 +136,7 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
} }
// Only test the interpreted version has same result with codegen version. // Only test the interpreted version has same result with codegen version.
checkEvaluation(Murmur3Hash(literals, seed), Murmur3Hash(literals, seed).eval()) checkEvaluation(Murmur3Hash(literals, seed), Murmur3Hash(literals, seed).eval())
checkEvaluation(XxHash64(literals, seed), XxHash64(literals, seed).eval())
} }
} }
} }