Merge pull request #427 from pwendell/deprecate-aggregator
Deprecate rather than remove old combineValuesByKey function
This commit is contained in:
commit
d601a76d1f
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark
|
||||
|
||||
import scala.{Option, deprecated}
|
||||
|
||||
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
|
||||
|
||||
/**
|
||||
|
@ -34,8 +36,12 @@ case class Aggregator[K, V, C] (
|
|||
private val sparkConf = SparkEnv.get.conf
|
||||
private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
|
||||
|
||||
@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
|
||||
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
|
||||
combineValuesByKey(iter, null)
|
||||
|
||||
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
|
||||
context: TaskContext) : Iterator[(K, C)] = {
|
||||
context: TaskContext): Iterator[(K, C)] = {
|
||||
if (!externalSorting) {
|
||||
val combiners = new AppendOnlyMap[K,C]
|
||||
var kv: Product2[K, V] = null
|
||||
|
@ -53,12 +59,17 @@ case class Aggregator[K, V, C] (
|
|||
val (k, v) = iter.next()
|
||||
combiners.insert(k, v)
|
||||
}
|
||||
context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
|
||||
context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
|
||||
// TODO: Make this non optional in a future release
|
||||
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
|
||||
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
|
||||
combiners.iterator
|
||||
}
|
||||
}
|
||||
|
||||
@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
|
||||
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
|
||||
combineCombinersByKey(iter, null)
|
||||
|
||||
def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
|
||||
if (!externalSorting) {
|
||||
val combiners = new AppendOnlyMap[K,C]
|
||||
|
@ -77,8 +88,9 @@ case class Aggregator[K, V, C] (
|
|||
val (k, c) = iter.next()
|
||||
combiners.insert(k, c)
|
||||
}
|
||||
context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
|
||||
context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
|
||||
// TODO: Make this non optional in a future release
|
||||
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
|
||||
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
|
||||
combiners.iterator
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue