From e79dd89cf6b513264d8205df1d4561cb07406d79 Mon Sep 17 00:00:00 2001 From: liuqi Date: Wed, 9 Jun 2021 10:57:27 +0900 Subject: [PATCH] [SPARK-35512][PYTHON] Fix OverflowError(cannot convert float infinity to integer) in partitionBy function ### What changes were proposed in this pull request? Limit the batch size for `add_shuffle_key` in `partitionBy` function to fix `OverflowError: cannot convert float infinity to integer` ### Why are the changes needed? It's not easy to write a UT, but I can use some simple code to explain the bug. * Original code ``` def add_shuffle_key(split, iterator): buckets = defaultdict(list) c, batch = 0, min(10 * numPartitions, 1000) for k, v in iterator: buckets[partitionFunc(k) % numPartitions].append((k, v)) c += 1 # check used memory and avg size of chunk of objects if (c % 1000 == 0 and get_used_memory() > limit or c > batch): n, size = len(buckets), 0 for split in list(buckets.keys()): yield pack_long(split) d = outputSerializer.dumps(buckets[split]) del buckets[split] yield d size += len(d) avg = int(size / n) >> 20 # let 1M < avg < 10M if avg < 1: batch *= 1.5 elif avg > 10: batch = max(int(batch / 1.5), 1) c = 0 ``` if `get_used_memory() > limit` always is `True` and `avg < 1` always is `True`, the variable `batch` will grow to infinity. then `batch = max(int(batch / 1.5), 1)` may raise `OverflowError` if `avg > 10` at some time. * sample code to reproduce the bug ``` import sys limit = 100 used_memory = 200 numPartitions = 64 c, batch = 0, min(10 * numPartitions, 1000) while True: c += 1 if (c % 1000 == 0 and used_memory > limit or c > batch): batch = batch * 1.5 d = max(int(batch / 1.5), 1) print(c, batch) ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? It's not easy to write a UT, there is sample code to test ``` import sys limit = 100 used_memory = 200 numPartitions = 64 c, batch = 0, min(10 * numPartitions, 1000) while True: c += 1 if (c % 1000 == 0 and used_memory > limit or c > batch): batch = min(sys.maxsize, batch * 1.5) d = max(int(batch / 1.5), 1) print(c, batch) ``` Closes #32667 from nolanliou/fix_partitionby. Authored-by: liuqi Signed-off-by: Hyukjin Kwon --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 7e411de915..25ae52f630 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2067,7 +2067,7 @@ class RDD(object): avg = int(size / n) >> 20 # let 1M < avg < 10M if avg < 1: - batch *= 1.5 + batch = min(sys.maxsize, batch * 1.5) elif avg > 10: batch = max(int(batch / 1.5), 1) c = 0