[SPARK-13339][DOCS] Clarify commutative / associative operator requirements for reduce, fold

Clarify that reduce functions need to be commutative, and fold functions do not

See https://github.com/apache/spark/pull/11091

Author: Sean Owen <sowen@cloudera.com>

Closes #11217 from srowen/SPARK-13339.
This commit is contained in:
Sean Owen 2016-02-19 10:26:38 +00:00
parent c776fce99b
commit fb7e21797e
14 changed files with 68 additions and 69 deletions

View file

@ -305,11 +305,11 @@ setMethod("groupByKey",
#' Merge values by key #' Merge values by key
#' #'
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
#' and merges the values for each key using an associative reduce function. #' and merges the values for each key using an associative and commutative reduce function.
#' #'
#' @param x The RDD to reduce by key. Should be an RDD where each element is #' @param x The RDD to reduce by key. Should be an RDD where each element is
#' list(K, V) or c(K, V). #' list(K, V) or c(K, V).
#' @param combineFunc The associative reduce function to use. #' @param combineFunc The associative and commutative reduce function to use.
#' @param numPartitions Number of partitions to create. #' @param numPartitions Number of partitions to create.
#' @return An RDD where each element is list(K, V') where V' is the merged #' @return An RDD where each element is list(K, V') where V' is the merged
#' value #' value
@ -347,12 +347,12 @@ setMethod("reduceByKey",
#' Merge values by key locally #' Merge values by key locally
#' #'
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
#' and merges the values for each key using an associative reduce function, but return the #' and merges the values for each key using an associative and commutative reduce function, but
#' results immediately to the driver as an R list. #' return the results immediately to the driver as an R list.
#' #'
#' @param x The RDD to reduce by key. Should be an RDD where each element is #' @param x The RDD to reduce by key. Should be an RDD where each element is
#' list(K, V) or c(K, V). #' list(K, V) or c(K, V).
#' @param combineFunc The associative reduce function to use. #' @param combineFunc The associative and commutative reduce function to use.
#' @return A list of elements of type list(K, V') where V' is the merged value for each key #' @return A list of elements of type list(K, V') where V' is the merged value for each key
#' @seealso reduceByKey #' @seealso reduceByKey
#' @examples #' @examples

View file

@ -29,9 +29,9 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
/** /**
* A simpler value of [[Accumulable]] where the result type being accumulated is the same * A simpler value of [[Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged, i.e. variables that are only "added" to through an * as the types of elements being merged, i.e. variables that are only "added" to through an
* associative operation and can therefore be efficiently supported in parallel. They can be used * associative and commutative operation and can therefore be efficiently supported in parallel.
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric * They can be used to implement counters (as in MapReduce) or sums. Spark natively supports
* value types, and programmers can add support for new types. * accumulators of numeric value types, and programmers can add support for new types.
* *
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.

View file

@ -278,17 +278,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
/** /**
* Merge the values for each key using an associative reduce function. This will also perform * Merge the values for each key using an associative and commutative reduce function. This will
* the merging locally on each mapper before sending results to a reducer, similarly to a * also perform the merging locally on each mapper before sending results to a reducer, similarly
* "combiner" in MapReduce. * to a "combiner" in MapReduce.
*/ */
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.reduceByKey(partitioner, func)) fromRDD(rdd.reduceByKey(partitioner, func))
/** /**
* Merge the values for each key using an associative reduce function, but return the results * Merge the values for each key using an associative and commutative reduce function, but return
* immediately to the master as a Map. This will also perform the merging locally on each mapper * the result immediately to the master as a Map. This will also perform the merging locally on
* before sending results to a reducer, similarly to a "combiner" in MapReduce. * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
*/ */
def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] = def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] =
mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func)) mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func))
@ -381,9 +381,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(rdd.foldByKey(zeroValue)(func)) fromRDD(rdd.foldByKey(zeroValue)(func))
/** /**
* Merge the values for each key using an associative reduce function. This will also perform * Merge the values for each key using an associative and commutative reduce function. This will
* the merging locally on each mapper before sending results to a reducer, similarly to a * also perform the merging locally on each mapper before sending results to a reducer, similarly
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/ */
def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] = def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] =
fromRDD(rdd.reduceByKey(func, numPartitions)) fromRDD(rdd.reduceByKey(func, numPartitions))
@ -461,10 +461,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(rdd.partitionBy(partitioner)) fromRDD(rdd.partitionBy(partitioner))
/** /**
* Merge the values for each key using an associative reduce function. This will also perform * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* the merging locally on each mapper before sending results to a reducer, similarly to a * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* "combiner" in MapReduce. * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/ */
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
fromRDD(rdd.join(other, partitioner)) fromRDD(rdd.join(other, partitioner))
@ -520,9 +520,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
} }
/** /**
* Merge the values for each key using an associative reduce function. This will also perform * Merge the values for each key using an associative and commutative reduce function. This will
* the merging locally on each mapper before sending results to a reducer, similarly to a * also perform the merging locally on each mapper before sending results to a reducer, similarly
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level. * parallelism level.
*/ */
def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = { def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {

View file

@ -373,7 +373,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/** /**
* Aggregate the elements of each partition, and then the results for all the partitions, using a * Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative and commutative function and a neutral "zero value". The function * given associative function and a neutral "zero value". The function
* op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
* allocation; however, it should not modify t2. * allocation; however, it should not modify t2.
* *

View file

@ -300,27 +300,27 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} }
/** /**
* Merge the values for each key using an associative reduce function. This will also perform * Merge the values for each key using an associative and commutative reduce function. This will
* the merging locally on each mapper before sending results to a reducer, similarly to a * also perform the merging locally on each mapper before sending results to a reducer, similarly
* "combiner" in MapReduce. * to a "combiner" in MapReduce.
*/ */
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
} }
/** /**
* Merge the values for each key using an associative reduce function. This will also perform * Merge the values for each key using an associative and commutative reduce function. This will
* the merging locally on each mapper before sending results to a reducer, similarly to a * also perform the merging locally on each mapper before sending results to a reducer, similarly
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/ */
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope { def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func) reduceByKey(new HashPartitioner(numPartitions), func)
} }
/** /**
* Merge the values for each key using an associative reduce function. This will also perform * Merge the values for each key using an associative and commutative reduce function. This will
* the merging locally on each mapper before sending results to a reducer, similarly to a * also perform the merging locally on each mapper before sending results to a reducer, similarly
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level. * parallelism level.
*/ */
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
@ -328,9 +328,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} }
/** /**
* Merge the values for each key using an associative reduce function, but return the results * Merge the values for each key using an associative and commutative reduce function, but return
* immediately to the master as a Map. This will also perform the merging locally on each mapper * the results immediately to the master as a Map. This will also perform the merging locally on
* before sending results to a reducer, similarly to a "combiner" in MapReduce. * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
*/ */
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope { def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
val cleanedF = self.sparkContext.clean(func) val cleanedF = self.sparkContext.clean(func)

View file

@ -973,7 +973,7 @@ abstract class RDD[T: ClassTag](
/** /**
* Aggregate the elements of each partition, and then the results for all the partitions, using a * Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative and commutative function and a neutral "zero value". The function * given associative function and a neutral "zero value". The function
* op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
* allocation; however, it should not modify t2. * allocation; however, it should not modify t2.
* *

View file

@ -1343,7 +1343,7 @@ value of the broadcast variable (e.g. if the variable is shipped to a new node l
## Accumulators ## Accumulators
Accumulators are variables that are only "added" to through an associative operation and can Accumulators are variables that are only "added" to through an associative and commutative operation and can
therefore be efficiently supported in parallel. They can be used to implement counters (as in therefore be efficiently supported in parallel. They can be used to implement counters (as in
MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers
can add support for new types. If accumulators are created with a name, they will be can add support for new types. If accumulators are created with a name, they will be

View file

@ -798,7 +798,7 @@ Some of the common ones are as follows.
<td> <b>reduce</b>(<i>func</i>) </td> <td> <b>reduce</b>(<i>func</i>) </td>
<td> Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the <td> Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the
source DStream using a function <i>func</i> (which takes two arguments and returns one). source DStream using a function <i>func</i> (which takes two arguments and returns one).
The function should be associative so that it can be computed in parallel. </td> The function should be associative and commutative so that it can be computed in parallel. </td>
</tr> </tr>
<tr> <tr>
<td> <b>countByValue</b>() </td> <td> <b>countByValue</b>() </td>
@ -1072,7 +1072,7 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>.
<tr> <tr>
<td> <b>reduceByWindow</b>(<i>func</i>, <i>windowLength</i>, <i>slideInterval</i>) </td> <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowLength</i>, <i>slideInterval</i>) </td>
<td> Return a new single-element stream, created by aggregating elements in the stream over a <td> Return a new single-element stream, created by aggregating elements in the stream over a
sliding interval using <i>func</i>. The function should be associative so that it can be computed sliding interval using <i>func</i>. The function should be associative and commutative so that it can be computed
correctly in parallel. correctly in parallel.
</td> </td>
</tr> </tr>

View file

@ -844,8 +844,7 @@ class RDD(object):
def fold(self, zeroValue, op): def fold(self, zeroValue, op):
""" """
Aggregate the elements of each partition, and then the results for all Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative and commutative function and the partitions, using a given associative function and a neutral "zero value."
a neutral "zero value."
The function C{op(t1, t2)} is allowed to modify C{t1} and return it The function C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not as its result value to avoid object allocation; however, it should not
@ -1558,7 +1557,7 @@ class RDD(object):
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
""" """
Merge the values for each key using an associative reduce function. Merge the values for each key using an associative and commutative reduce function.
This will also perform the merging locally on each mapper before This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce. sending results to a reducer, similarly to a "combiner" in MapReduce.
@ -1576,7 +1575,7 @@ class RDD(object):
def reduceByKeyLocally(self, func): def reduceByKeyLocally(self, func):
""" """
Merge the values for each key using an associative reduce function, but Merge the values for each key using an associative and commutative reduce function, but
return the results immediately to the master as a dictionary. return the results immediately to the master as a dictionary.
This will also perform the merging locally on each mapper before This will also perform the merging locally on each mapper before

View file

@ -453,7 +453,7 @@ class DStream(object):
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
This is more efficient than `invReduceFunc` is None. This is more efficient than `invReduceFunc` is None.
@param reduceFunc: associative reduce function @param reduceFunc: associative and commutative reduce function
@param invReduceFunc: inverse reduce function of `reduceFunc` @param invReduceFunc: inverse reduce function of `reduceFunc`
@param windowDuration: width of the window; must be a multiple of this DStream's @param windowDuration: width of the window; must be a multiple of this DStream's
batching interval batching interval
@ -524,7 +524,7 @@ class DStream(object):
`invFunc` can be None, then it will reduce all the RDDs in window, could be slower `invFunc` can be None, then it will reduce all the RDDs in window, could be slower
than having `invFunc`. than having `invFunc`.
@param func: associative reduce function @param func: associative and commutative reduce function
@param invFunc: inverse function of `reduceFunc` @param invFunc: inverse function of `reduceFunc`
@param windowDuration: width of the window; must be a multiple of this DStream's @param windowDuration: width of the window; must be a multiple of this DStream's
batching interval batching interval

View file

@ -214,7 +214,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/** /**
* Return a new DStream in which each RDD has a single element generated by reducing all * Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. * elements in a sliding window over this DStream.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
@ -234,7 +234,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/** /**
* Return a new DStream in which each RDD has a single element generated by reducing all * Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. * elements in a sliding window over this DStream.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
@ -257,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient than reduceByWindow without "inverse reduce" function. * This is more efficient than reduceByWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions". * However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param invReduceFunc inverse reduce function * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval

View file

@ -138,8 +138,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/** /**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs * merged using the associative and commutative reduce function. Hash partitioning is used to
* with Spark's default number of partitions. * generate the RDDs with Spark's default number of partitions.
*/ */
def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
dstream.reduceByKey(func) dstream.reduceByKey(func)
@ -257,7 +257,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
* generates RDDs with the same interval as this DStream. Hash partitioning is used to generate * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
* the RDDs with Spark's default number of partitions. * the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
*/ */
@ -270,7 +270,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions. * generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
@ -289,7 +289,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions. * generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
@ -309,7 +309,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/** /**
* Return a new DStream by applying `reduceByKey` over a sliding window. Similar to * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
* `DStream.reduceByKey()`, but applies it over a sliding window. * `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function * @param reduceFunc associative rand commutative educe function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
@ -335,7 +335,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions". * However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param invReduceFunc inverse function * @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
@ -360,7 +360,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions". * However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param invReduceFunc inverse function * @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
@ -397,7 +397,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions". * However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param invReduceFunc inverse function * @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval

View file

@ -791,7 +791,7 @@ abstract class DStream[T: ClassTag] (
/** /**
* Return a new DStream in which each RDD has a single element generated by reducing all * Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. * elements in a sliding window over this DStream.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
@ -814,7 +814,7 @@ abstract class DStream[T: ClassTag] (
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient than reduceByWindow without "inverse reduce" function. * This is more efficient than reduceByWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions". * However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param invReduceFunc inverse reduce function * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval

View file

@ -75,8 +75,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
/** /**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs * merged using the associative and commutative reduce function. Hash partitioning is used to
* with Spark's default number of partitions. * generate the RDDs with Spark's default number of partitions.
*/ */
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope { def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
reduceByKey(reduceFunc, defaultPartitioner()) reduceByKey(reduceFunc, defaultPartitioner())
@ -204,7 +204,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
* Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
* generates RDDs with the same interval as this DStream. Hash partitioning is used to generate * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
* the RDDs with Spark's default number of partitions. * the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
*/ */
@ -219,7 +219,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
* Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions. * generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
@ -238,7 +238,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
* Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions. * generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
@ -259,7 +259,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
/** /**
* Return a new DStream by applying `reduceByKey` over a sliding window. Similar to * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
* `DStream.reduceByKey()`, but applies it over a sliding window. * `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
@ -289,7 +289,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
* This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions". * However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param invReduceFunc inverse reduce function * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
@ -320,7 +320,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions". * However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function * @param reduceFunc associative and commutative reduce function
* @param invReduceFunc inverse reduce function * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval