spark-instrumented-optimizer/python
Nicholas Hwang a803ac3e06 [SPARK-9021] [PYSPARK] Change RDD.aggregate() to do reduce(mapPartitions()) instead of mapPartitions.fold()
I'm relatively new to Spark and functional programming, so forgive me if this pull request is just a result of my misunderstanding of how Spark should be used.

Currently, if one happens to use a mutable object as `zeroValue` for `RDD.aggregate()`, possibly unexpected behavior can occur.

This is because pyspark's current implementation of `RDD.aggregate()` does not serialize or make a copy of `zeroValue` before handing it off to `RDD.mapPartitions(...).fold(...)`. This results in a single reference to `zeroValue` being used for both `RDD.mapPartitions()` and `RDD.fold()` on each partition. This can result in strange accumulator values being fed into each partition's call to `RDD.fold()`, as the `zeroValue` may have been changed in-place during the `RDD.mapPartitions()` call.

As an illustrative example, submit the following to `spark-submit`:
```
from pyspark import SparkConf, SparkContext
import collections

def updateCounter(acc, val):
    print 'update acc:', acc
    print 'update val:', val
    acc[val] += 1
    return acc

def comboCounter(acc1, acc2):
    print 'combo acc1:', acc1
    print 'combo acc2:', acc2
    acc1.update(acc2)
    return acc1

def main():
    conf = SparkConf().setMaster("local").setAppName("Aggregate with Counter")
    sc = SparkContext(conf = conf)

    print '======= AGGREGATING with ONE PARTITION ======='
    print sc.parallelize(range(1,10), 1).aggregate(collections.Counter(), updateCounter, comboCounter)

    print '======= AGGREGATING with TWO PARTITIONS ======='
    print sc.parallelize(range(1,10), 2).aggregate(collections.Counter(), updateCounter, comboCounter)

if __name__ == "__main__":
    main()
```

One probably expects this to output the following:
```
Counter({1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1})
```

But it instead outputs this (regardless of the number of partitions):
```
Counter({1: 2, 2: 2, 3: 2, 4: 2, 5: 2, 6: 2, 7: 2, 8: 2, 9: 2})
```

This is because (I believe) `zeroValue` gets passed correctly to each partition, but after `RDD.mapPartitions()` completes, the `zeroValue` object has been updated and is then passed to `RDD.fold()`, which results in all items being double-counted within each partition before being finally reduced at the calling node.

I realize that this type of calculation is typically done by `RDD.mapPartitions(...).reduceByKey(...)`, but hopefully this illustrates some potentially confusing behavior. I also noticed that other `RDD` methods use this `deepcopy` approach to creating unique copies of `zeroValue` (i.e., `RDD.aggregateByKey()` and `RDD.foldByKey()`), and that the Scala implementations do seem to serialize the `zeroValue` object appropriately to prevent this type of behavior.

Author: Nicholas Hwang <moogling@gmail.com>

Closes #7378 from njhwang/master and squashes the following commits:

659bb27 [Nicholas Hwang] Fixed RDD.aggregate() to perform a reduce operation on collected mapPartitions results, similar to how fold currently is implemented. This prevents an initial combOp being performed on each partition with zeroValue (which leads to unexpected behavior if zeroValue is a mutable object) before being combOp'ed with other partition results.
8d8d694 [Nicholas Hwang] Changed dict construction to be compatible with Python 2.6 (cannot use list comprehensions to make dicts)
56eb2ab [Nicholas Hwang] Fixed whitespace after colon to conform with PEP8
391de4a [Nicholas Hwang] Removed used of collections.Counter from RDD tests for Python 2.6 compatibility; used defaultdict(int) instead. Merged treeAggregate test with mutable zero value into aggregate test to reduce code duplication.
2fa4e4b [Nicholas Hwang] Merge branch 'master' of https://github.com/njhwang/spark
ba528bd [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c3e1. Also replaced some parallelizations of ranges with xranges, per the documentation's recommendations of preferring xrange over range.
7820391 [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c3e1.
90d1544 [Nicholas Hwang] Made sure RDD.aggregate() makes a deepcopy of zeroValue for all partitions; this ensures that the mapPartitions call works with unique copies of zeroValue in each partition, and prevents a single reference to zeroValue being used for both map and fold calls on each partition (resulting in possibly unexpected behavior).
2015-07-19 10:30:28 -07:00
..
docs [SPARK-7879] [MLLIB] KMeans API for spark.ml Pipelines 2015-07-17 18:30:04 -07:00
lib [SPARK-2305] [PySpark] Update Py4J to version 0.8.2.1 2014-07-29 19:02:06 -07:00
pyspark [SPARK-9021] [PYSPARK] Change RDD.aggregate() to do reduce(mapPartitions()) instead of mapPartitions.fold() 2015-07-19 10:30:28 -07:00
test_support [SPARK-8060] Improve DataFrame Python test coverage and documentation. 2015-06-03 00:23:34 -07:00
.gitignore [SPARK-3946] gitignore in /python includes wrong directory 2014-10-14 14:09:39 -07:00
run-tests [SPARK-8583] [SPARK-5482] [BUILD] Refactor python/run-tests to integrate with dev/run-tests module system 2015-06-27 20:24:34 -07:00
run-tests.py [SPARK-8450] [SQL] [PYSARK] cleanup type converter for Python DataFrame 2015-07-08 18:22:53 -07:00