diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0c35c66680..94ba22306a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -48,6 +48,35 @@ from py4j.java_collections import ListConverter, MapConverter __all__ = ["RDD"] +# TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized +# hash for string +def portable_hash(x): + """ + This function returns consistant hash code for builtin types, especially + for None and tuple with None. + + The algrithm is similar to that one used by CPython 2.7 + + >>> portable_hash(None) + 0 + >>> portable_hash((None, 1)) + 219750521 + """ + if x is None: + return 0 + if isinstance(x, tuple): + h = 0x345678 + for i in x: + h ^= portable_hash(i) + h *= 1000003 + h &= 0xffffffff + h ^= len(x) + if h == -1: + h = -2 + return h + return hash(x) + + def _extract_concise_traceback(): """ This function returns the traceback info for a callsite, returns a dict @@ -1164,7 +1193,9 @@ class RDD(object): return python_right_outer_join(self, other, numPartitions) # TODO: add option to control map-side combining - def partitionBy(self, numPartitions, partitionFunc=None): + # portable_hash is used as default, because builtin hash of None is different + # cross machines. + def partitionBy(self, numPartitions, partitionFunc=portable_hash): """ Return a copy of the RDD partitioned using the specified partitioner. @@ -1176,8 +1207,6 @@ class RDD(object): if numPartitions is None: numPartitions = self._defaultReducePartitions() - if partitionFunc is None: - partitionFunc = lambda x: 0 if x is None else hash(x) # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair.