[SPARK-17817][PYSPARK] PySpark RDD Repartitioning Results in Highly Skewed Partition Sizes

## What changes were proposed in this pull request?

Quoted from JIRA description:

Calling repartition on a PySpark RDD to increase the number of partitions results in highly skewed partition sizes, with most having 0 rows. The repartition method should evenly spread out the rows across the partitions, and this behavior is correctly seen on the Scala side.

Please reference the following code for a reproducible example of this issue:

    num_partitions = 20000
    a = sc.parallelize(range(int(1e6)), 2)  # start with 2 even partitions
    l = a.repartition(num_partitions).glom().map(len).collect()  # get length of each partition
    min(l), max(l), sum(l)/len(l), len(l)  # skewed!

In Scala's `repartition` code, we will distribute elements evenly across output partitions. However, the RDD from Python is serialized as a single binary data, so the distribution fails. We need to convert the RDD in Python to java object before repartitioning.

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #15389 from viirya/pyspark-rdd-repartition.
This commit is contained in:
Liang-Chi Hsieh 2016-10-11 11:43:24 -07:00 committed by Felix Cheung
parent 75b9e35141
commit 07508bd01d
2 changed files with 20 additions and 3 deletions

View file

@ -2017,8 +2017,7 @@ class RDD(object):
>>> len(rdd.repartition(10).glom().collect())
10
"""
jrdd = self._jrdd.repartition(numPartitions)
return RDD(jrdd, self.ctx, self._jrdd_deserializer)
return self.coalesce(numPartitions, shuffle=True)
def coalesce(self, numPartitions, shuffle=False):
"""
@ -2029,7 +2028,15 @@ class RDD(object):
>>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]
"""
jrdd = self._jrdd.coalesce(numPartitions, shuffle)
if shuffle:
# In Scala's repartition code, we will distribute elements evenly across output
# partitions. However, the RDD from Python is serialized as a single binary data,
# so the distribution fails and produces highly skewed partitions. We need to
# convert it to a RDD of java object before repartitioning.
data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle)
jrdd = self.ctx._jvm.SerDeUtil.javaToPython(data_java_rdd)
else:
jrdd = self._jrdd.coalesce(numPartitions, shuffle)
return RDD(jrdd, self.ctx, self._jrdd_deserializer)
def zip(self, other):

View file

@ -914,6 +914,16 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual(partitions[0], [(0, 5), (0, 8), (2, 6)])
self.assertEqual(partitions[1], [(1, 3), (3, 8), (3, 8)])
def test_repartition_no_skewed(self):
num_partitions = 20
a = self.sc.parallelize(range(int(1000)), 2)
l = a.repartition(num_partitions).glom().map(len).collect()
zeros = len([x for x in l if x == 0])
self.assertTrue(zeros == 0)
l = a.coalesce(num_partitions, True).glom().map(len).collect()
zeros = len([x for x in l if x == 0])
self.assertTrue(zeros == 0)
def test_distinct(self):
rdd = self.sc.parallelize((1, 2, 3)*10, 10)
self.assertEqual(rdd.getNumPartitions(), 10)