[SPARK-24076][SQL] Use different seed in HashAggregate to avoid hash conflict
## What changes were proposed in this pull request? HashAggregate uses the same hash algorithm and seed as ShuffleExchange, it may lead to bad hash conflict when shuffle.partitions=8192*n. Considering below example: ``` SET spark.sql.shuffle.partitions=8192; INSERT OVERWRITE TABLE target_xxx SELECT item_id, auct_end_dt FROM from source_xxx GROUP BY item_id, auct_end_dt; ``` In the shuffle stage, if user sets the shuffle.partition = 8192, all tuples in the same partition will meet the following relationship: ``` hash(tuple x) = hash(tuple y) + n * 8192 ``` Then in the next HashAggregate stage, all tuples from the same partition need be put into a 16K BytesToBytesMap (unsafeRowAggBuffer). Here, the HashAggregate uses the same hash algorithm on the same expression as shuffle, and uses the same seed, and 16K = 8192 * 2, so actually, all tuples in the same parititon will only be hashed to 2 different places in the BytesToBytesMap. It is bad hash conflict. With BytesToBytesMap growing, the conflict will always exist. Before change: <img width="334" alt="hash_conflict" src="https://user-images.githubusercontent.com/2989575/39250210-ed032d46-48d2-11e8-855a-c1afc2a0ceb5.png"> After change: <img width="334" alt="no_hash_conflict" src="https://user-images.githubusercontent.com/2989575/39250218-f1cb89e0-48d2-11e8-9244-5a93c1e8b60d.png"> ## How was this patch tested? Unit tests and production cases. Author: yucai <yyu1@ebay.com> Closes #21149 from yucai/SPARK-24076.
This commit is contained in:
parent
05eb19b6e0
commit
e17567ca78
|
@ -755,7 +755,10 @@ case class HashAggregateExec(
|
|||
}
|
||||
|
||||
// generate hash code for key
|
||||
val hashExpr = Murmur3Hash(groupingExpressions, 42)
|
||||
// SPARK-24076: HashAggregate uses the same hash algorithm on the same expressions
|
||||
// as ShuffleExchange, it may lead to bad hash conflict when shuffle.partitions=8192*n,
|
||||
// pick a different seed to avoid this conflict
|
||||
val hashExpr = Murmur3Hash(groupingExpressions, 48)
|
||||
val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx)
|
||||
|
||||
val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter,
|
||||
|
|
Loading…
Reference in a new issue