Merge pull request #396 from JoshRosen/spark-653

Make PySpark AccumulatorParam an abstract base class
This commit is contained in:
Matei Zaharia 2013-01-24 13:05:03 -08:00
commit a2f4891d1d
2 changed files with 29 additions and 14 deletions

View file

@ -25,7 +25,8 @@
>>> a.value >>> a.value
13 13
>>> class VectorAccumulatorParam(object): >>> from pyspark.accumulators import AccumulatorParam
>>> class VectorAccumulatorParam(AccumulatorParam):
... def zero(self, value): ... def zero(self, value):
... return [0.0] * len(value) ... return [0.0] * len(value)
... def addInPlace(self, val1, val2): ... def addInPlace(self, val1, val2):
@ -90,8 +91,7 @@ class Accumulator(object):
While C{SparkContext} supports accumulators for primitive data types like C{int} and While C{SparkContext} supports accumulators for primitive data types like C{int} and
C{float}, users can also define accumulators for custom types by providing a custom C{float}, users can also define accumulators for custom types by providing a custom
C{AccumulatorParam} object with a C{zero} and C{addInPlace} method. Refer to the doctest L{AccumulatorParam} object. Refer to the doctest of this module for an example.
of this module for an example.
""" """
def __init__(self, aid, value, accum_param): def __init__(self, aid, value, accum_param):
@ -134,7 +134,27 @@ class Accumulator(object):
return "Accumulator<id=%i, value=%s>" % (self.aid, self._value) return "Accumulator<id=%i, value=%s>" % (self.aid, self._value)
class AddingAccumulatorParam(object): class AccumulatorParam(object):
"""
Helper object that defines how to accumulate values of a given type.
"""
def zero(self, value):
"""
Provide a "zero value" for the type, compatible in dimensions with the
provided C{value} (e.g., a zero vector)
"""
raise NotImplementedError
def addInPlace(self, value1, value2):
"""
Add two values of the accumulator's data type, returning a new value;
for efficiency, can also update C{value1} in place and return it.
"""
raise NotImplementedError
class AddingAccumulatorParam(AccumulatorParam):
""" """
An AccumulatorParam that uses the + operators to add values. Designed for simple types An AccumulatorParam that uses the + operators to add values. Designed for simple types
such as integers, floats, and lists. Requires the zero value for the underlying type such as integers, floats, and lists. Requires the zero value for the underlying type

View file

@ -165,16 +165,11 @@ class SparkContext(object):
def accumulator(self, value, accum_param=None): def accumulator(self, value, accum_param=None):
""" """
Create an C{Accumulator} with the given initial value, using a given Create an L{Accumulator} with the given initial value, using a given
AccumulatorParam helper object to define how to add values of the data L{AccumulatorParam} helper object to define how to add values of the
type if provided. Default AccumulatorParams are used for integers and data type if provided. Default AccumulatorParams are used for integers
floating-point numbers if you do not provide one. For other types, the and floating-point numbers if you do not provide one. For other types,
AccumulatorParam must implement two methods: a custom AccumulatorParam can be used.
- C{zero(value)}: provide a "zero value" for the type, compatible in
dimensions with the provided C{value} (e.g., a zero vector).
- C{addInPlace(val1, val2)}: add two values of the accumulator's data
type, returning a new value; for efficiency, can also update C{val1}
in place and return it.
""" """
if accum_param == None: if accum_param == None:
if isinstance(value, int): if isinstance(value, int):