[SPARK-35512][PYTHON] Fix OverflowError(cannot convert float infinity to integer) in partitionBy function

### What changes were proposed in this pull request?
Limit the batch size for `add_shuffle_key` in `partitionBy` function to fix `OverflowError: cannot convert float infinity to integer`

### Why are the changes needed?
It's not easy to write a UT, but I can use some simple code to explain the bug.
* Original code
```
        def add_shuffle_key(split, iterator):

            buckets = defaultdict(list)
            c, batch = 0, min(10 * numPartitions, 1000)

            for k, v in iterator:
                buckets[partitionFunc(k) % numPartitions].append((k, v))
                c += 1

                # check used memory and avg size of chunk of objects
                if (c % 1000 == 0 and get_used_memory() > limit
                        or c > batch):
                    n, size = len(buckets), 0
                    for split in list(buckets.keys()):
                        yield pack_long(split)
                        d = outputSerializer.dumps(buckets[split])
                        del buckets[split]
                        yield d
                        size += len(d)

                    avg = int(size / n) >> 20
                    # let 1M < avg < 10M
                    if avg < 1:
                        batch *= 1.5
                    elif avg > 10:
                        batch = max(int(batch / 1.5), 1)
                    c = 0
```
if `get_used_memory() > limit` always is `True` and `avg < 1` always is `True`, the variable `batch` will grow to infinity. then `batch = max(int(batch / 1.5), 1)` may raise `OverflowError` if `avg > 10` at some time.
* sample code to reproduce the bug
```
import sys

limit = 100
used_memory = 200
numPartitions = 64
c, batch = 0, min(10 * numPartitions, 1000)

while True:
    c += 1
    if (c % 1000 == 0 and used_memory > limit or c > batch):
        batch = batch * 1.5
        d = max(int(batch / 1.5), 1)
        print(c, batch)
```

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
It's not easy to write a UT, there is sample code to test
```
import sys

limit = 100
used_memory = 200
numPartitions = 64
c, batch = 0, min(10 * numPartitions, 1000)

while True:
    c += 1
    if (c % 1000 == 0 and used_memory > limit or c > batch):
        batch = min(sys.maxsize, batch * 1.5)
        d = max(int(batch / 1.5), 1)
        print(c, batch)
```

Closes #32667 from nolanliou/fix_partitionby.

Authored-by: liuqi <nolan.liou@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
liuqi 2021-06-09 10:57:27 +09:00 committed by Hyukjin Kwon
parent 93a9dc479c
commit e79dd89cf6

View file

@ -2067,7 +2067,7 @@ class RDD(object):
avg = int(size / n) >> 20
# let 1M < avg < 10M
if avg < 1:
batch *= 1.5
batch = min(sys.maxsize, batch * 1.5)
elif avg > 10:
batch = max(int(batch / 1.5), 1)
c = 0