[SPARK-2494] [PySpark] make hash of None consistant cross machines
In CPython, hash of None is different cross machines, it will cause wrong result during shuffle. This PR will fix this. Author: Davies Liu <davies.liu@gmail.com> Closes #1371 from davies/hash_of_none and squashes the following commits: d01745f [Davies Liu] add comments, remove outdated unit tests 5467141 [Davies Liu] disable hijack of hash, use it only for partitionBy() b7118aa [Davies Liu] use __builtin__ instead of __builtins__ 839e417 [Davies Liu] hijack hash to make hash of None consistant cross machines
This commit is contained in:
parent
f89cf65d7a
commit
872538c600
|
@ -48,6 +48,35 @@ from py4j.java_collections import ListConverter, MapConverter
|
|||
__all__ = ["RDD"]
|
||||
|
||||
|
||||
# TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized
|
||||
# hash for string
|
||||
def portable_hash(x):
|
||||
"""
|
||||
This function returns consistant hash code for builtin types, especially
|
||||
for None and tuple with None.
|
||||
|
||||
The algrithm is similar to that one used by CPython 2.7
|
||||
|
||||
>>> portable_hash(None)
|
||||
0
|
||||
>>> portable_hash((None, 1))
|
||||
219750521
|
||||
"""
|
||||
if x is None:
|
||||
return 0
|
||||
if isinstance(x, tuple):
|
||||
h = 0x345678
|
||||
for i in x:
|
||||
h ^= portable_hash(i)
|
||||
h *= 1000003
|
||||
h &= 0xffffffff
|
||||
h ^= len(x)
|
||||
if h == -1:
|
||||
h = -2
|
||||
return h
|
||||
return hash(x)
|
||||
|
||||
|
||||
def _extract_concise_traceback():
|
||||
"""
|
||||
This function returns the traceback info for a callsite, returns a dict
|
||||
|
@ -1164,7 +1193,9 @@ class RDD(object):
|
|||
return python_right_outer_join(self, other, numPartitions)
|
||||
|
||||
# TODO: add option to control map-side combining
|
||||
def partitionBy(self, numPartitions, partitionFunc=None):
|
||||
# portable_hash is used as default, because builtin hash of None is different
|
||||
# cross machines.
|
||||
def partitionBy(self, numPartitions, partitionFunc=portable_hash):
|
||||
"""
|
||||
Return a copy of the RDD partitioned using the specified partitioner.
|
||||
|
||||
|
@ -1176,8 +1207,6 @@ class RDD(object):
|
|||
if numPartitions is None:
|
||||
numPartitions = self._defaultReducePartitions()
|
||||
|
||||
if partitionFunc is None:
|
||||
partitionFunc = lambda x: 0 if x is None else hash(x)
|
||||
# Transferring O(n) objects to Java is too expensive. Instead, we'll
|
||||
# form the hash buckets in Python, transferring O(numPartitions) objects
|
||||
# to Java. Each object is a (splitNumber, [objects]) pair.
|
||||
|
|
Loading…
Reference in a new issue