Change numSplits to numPartitions in PySpark.

This commit is contained in:
Josh Rosen 2013-02-24 13:25:09 -08:00
parent 3b9f929467
commit 2c966c98fb
2 changed files with 38 additions and 38 deletions

View file

@ -32,13 +32,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
def _do_python_join(rdd, other, numSplits, dispatch):
def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
return vs.union(ws).groupByKey(numSplits).flatMapValues(dispatch)
return vs.union(ws).groupByKey(numPartitions).flatMapValues(dispatch)
def python_join(rdd, other, numSplits):
def python_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
@ -47,10 +47,10 @@ def python_join(rdd, other, numSplits):
elif n == 2:
wbuf.append(v)
return [(v, w) for v in vbuf for w in wbuf]
return _do_python_join(rdd, other, numSplits, dispatch)
return _do_python_join(rdd, other, numPartitions, dispatch)
def python_right_outer_join(rdd, other, numSplits):
def python_right_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
@ -61,10 +61,10 @@ def python_right_outer_join(rdd, other, numSplits):
if not vbuf:
vbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf]
return _do_python_join(rdd, other, numSplits, dispatch)
return _do_python_join(rdd, other, numPartitions, dispatch)
def python_left_outer_join(rdd, other, numSplits):
def python_left_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
@ -75,10 +75,10 @@ def python_left_outer_join(rdd, other, numSplits):
if not wbuf:
wbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf]
return _do_python_join(rdd, other, numSplits, dispatch)
return _do_python_join(rdd, other, numPartitions, dispatch)
def python_cogroup(rdd, other, numSplits):
def python_cogroup(rdd, other, numPartitions):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
def dispatch(seq):
@ -89,4 +89,4 @@ def python_cogroup(rdd, other, numSplits):
elif n == 2:
wbuf.append(v)
return (vbuf, wbuf)
return vs.union(ws).groupByKey(numSplits).mapValues(dispatch)
return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)

View file

@ -215,7 +215,7 @@ class RDD(object):
yield pair
return java_cartesian.flatMap(unpack_batches)
def groupBy(self, f, numSplits=None):
def groupBy(self, f, numPartitions=None):
"""
Return an RDD of grouped items.
@ -224,7 +224,7 @@ class RDD(object):
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]
"""
return self.map(lambda x: (f(x), x)).groupByKey(numSplits)
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
def pipe(self, command, env={}):
"""
@ -274,7 +274,7 @@ class RDD(object):
def reduce(self, f):
"""
Reduces the elements of this RDD using the specified commutative and
Reduces the elements of this RDD using the specified commutative and
associative binary operator.
>>> from operator import add
@ -422,22 +422,22 @@ class RDD(object):
"""
return dict(self.collect())
def reduceByKey(self, func, numSplits=None):
def reduceByKey(self, func, numPartitions=None):
"""
Merge the values for each key using an associative reduce function.
This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.
Output will be hash-partitioned with C{numSplits} splits, or the
default parallelism level if C{numSplits} is not specified.
Output will be hash-partitioned with C{numPartitions} partitions, or
the default parallelism level if C{numPartitions} is not specified.
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
"""
return self.combineByKey(lambda x: x, func, func, numSplits)
return self.combineByKey(lambda x: x, func, func, numPartitions)
def reduceByKeyLocally(self, func):
"""
@ -474,7 +474,7 @@ class RDD(object):
"""
return self.map(lambda x: x[0]).countByValue()
def join(self, other, numSplits=None):
def join(self, other, numPartitions=None):
"""
Return an RDD containing all pairs of elements with matching keys in
C{self} and C{other}.
@ -489,9 +489,9 @@ class RDD(object):
>>> sorted(x.join(y).collect())
[('a', (1, 2)), ('a', (1, 3))]
"""
return python_join(self, other, numSplits)
return python_join(self, other, numPartitions)
def leftOuterJoin(self, other, numSplits=None):
def leftOuterJoin(self, other, numPartitions=None):
"""
Perform a left outer join of C{self} and C{other}.
@ -506,9 +506,9 @@ class RDD(object):
>>> sorted(x.leftOuterJoin(y).collect())
[('a', (1, 2)), ('b', (4, None))]
"""
return python_left_outer_join(self, other, numSplits)
return python_left_outer_join(self, other, numPartitions)
def rightOuterJoin(self, other, numSplits=None):
def rightOuterJoin(self, other, numPartitions=None):
"""
Perform a right outer join of C{self} and C{other}.
@ -523,10 +523,10 @@ class RDD(object):
>>> sorted(y.rightOuterJoin(x).collect())
[('a', (2, 1)), ('b', (None, 4))]
"""
return python_right_outer_join(self, other, numSplits)
return python_right_outer_join(self, other, numPartitions)
# TODO: add option to control map-side combining
def partitionBy(self, numSplits, partitionFunc=hash):
def partitionBy(self, numPartitions, partitionFunc=hash):
"""
Return a copy of the RDD partitioned using the specified partitioner.
@ -535,22 +535,22 @@ class RDD(object):
>>> set(sets[0]).intersection(set(sets[1]))
set([])
"""
if numSplits is None:
numSplits = self.ctx.defaultParallelism
if numPartitions is None:
numPartitions = self.ctx.defaultParallelism
# Transferring O(n) objects to Java is too expensive. Instead, we'll
# form the hash buckets in Python, transferring O(numSplits) objects
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
def add_shuffle_key(split, iterator):
buckets = defaultdict(list)
for (k, v) in iterator:
buckets[partitionFunc(k) % numSplits].append((k, v))
buckets[partitionFunc(k) % numPartitions].append((k, v))
for (split, items) in buckets.iteritems():
yield str(split)
yield dump_pickle(Batch(items))
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
partitioner = self.ctx._jvm.PythonPartitioner(numSplits,
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc))
jrdd = pairRDD.partitionBy(partitioner).values()
rdd = RDD(jrdd, self.ctx)
@ -561,7 +561,7 @@ class RDD(object):
# TODO: add control over map-side aggregation
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numSplits=None):
numPartitions=None):
"""
Generic function to combine the elements for each key using a custom
set of aggregation functions.
@ -586,8 +586,8 @@ class RDD(object):
>>> sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')]
"""
if numSplits is None:
numSplits = self.ctx.defaultParallelism
if numPartitions is None:
numPartitions = self.ctx.defaultParallelism
def combineLocally(iterator):
combiners = {}
for (k, v) in iterator:
@ -597,7 +597,7 @@ class RDD(object):
combiners[k] = mergeValue(combiners[k], v)
return combiners.iteritems()
locally_combined = self.mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numSplits)
shuffled = locally_combined.partitionBy(numPartitions)
def _mergeCombiners(iterator):
combiners = {}
for (k, v) in iterator:
@ -609,10 +609,10 @@ class RDD(object):
return shuffled.mapPartitions(_mergeCombiners)
# TODO: support variant with custom partitioner
def groupByKey(self, numSplits=None):
def groupByKey(self, numPartitions=None):
"""
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with into numSplits partitions.
Hash-partitions the resulting RDD with into numPartitions partitions.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(x.groupByKey().collect())
@ -630,7 +630,7 @@ class RDD(object):
return a + b
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numSplits)
numPartitions)
# TODO: add tests
def flatMapValues(self, f):
@ -659,7 +659,7 @@ class RDD(object):
return self.cogroup(other)
# TODO: add variant with custom parittioner
def cogroup(self, other, numSplits=None):
def cogroup(self, other, numPartitions=None):
"""
For each key k in C{self} or C{other}, return a resulting RDD that
contains a tuple with the list of values for that key in C{self} as well
@ -670,7 +670,7 @@ class RDD(object):
>>> sorted(x.cogroup(y).collect())
[('a', ([1], [2])), ('b', ([4], []))]
"""
return python_cogroup(self, other, numSplits)
return python_cogroup(self, other, numPartitions)
# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the