[SPARK-15322][MLLIB][CORE][SQL] update deprecate accumulator usage into accumulatorV2 in spark project
## What changes were proposed in this pull request? I use Intellj-IDEA to search usage of deprecate SparkContext.accumulator in the whole spark project, and update the code.(except those test code for accumulator method itself) ## How was this patch tested? Exisiting unit tests Author: WeichenXu <WeichenXu123@outlook.com> Closes #13112 from WeichenXu123/update_accuV2_in_mllib.
This commit is contained in:
parent
33814f887a
commit
2f9047b5eb
|
@ -65,9 +65,9 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
|
|||
test("foreachAsync") {
|
||||
zeroPartRdd.foreachAsync(i => Unit).get()
|
||||
|
||||
val accum = sc.accumulator(0)
|
||||
val accum = sc.longAccumulator
|
||||
sc.parallelize(1 to 1000, 3).foreachAsync { i =>
|
||||
accum += 1
|
||||
accum.add(1)
|
||||
}.get()
|
||||
assert(accum.value === 1000)
|
||||
}
|
||||
|
@ -75,9 +75,9 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
|
|||
test("foreachPartitionAsync") {
|
||||
zeroPartRdd.foreachPartitionAsync(iter => Unit).get()
|
||||
|
||||
val accum = sc.accumulator(0)
|
||||
val accum = sc.longAccumulator
|
||||
sc.parallelize(1 to 1000, 9).foreachPartitionAsync { iter =>
|
||||
accum += 1
|
||||
accum.add(1)
|
||||
}.get()
|
||||
assert(accum.value === 9)
|
||||
}
|
||||
|
|
|
@ -23,11 +23,12 @@ import java.nio.charset.Charset
|
|||
|
||||
import com.google.common.io.Files
|
||||
|
||||
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
|
||||
import org.apache.spark.util.IntParam
|
||||
import org.apache.spark.util.LongAccumulator
|
||||
|
||||
/**
|
||||
* Use this singleton to get or register a Broadcast variable.
|
||||
|
@ -54,13 +55,13 @@ object WordBlacklist {
|
|||
*/
|
||||
object DroppedWordsCounter {
|
||||
|
||||
@volatile private var instance: Accumulator[Long] = null
|
||||
@volatile private var instance: LongAccumulator = null
|
||||
|
||||
def getInstance(sc: SparkContext): Accumulator[Long] = {
|
||||
def getInstance(sc: SparkContext): LongAccumulator = {
|
||||
if (instance == null) {
|
||||
synchronized {
|
||||
if (instance == null) {
|
||||
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
|
||||
instance = sc.longAccumulator("WordsInBlacklistCounter")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -124,7 +125,7 @@ object RecoverableNetworkWordCount {
|
|||
// Use blacklist to drop words and use droppedWordsCounter to count them
|
||||
val counts = rdd.filter { case (word, count) =>
|
||||
if (blacklist.value.contains(word)) {
|
||||
droppedWordsCounter += count
|
||||
droppedWordsCounter.add(count)
|
||||
false
|
||||
} else {
|
||||
true
|
||||
|
|
|
@ -19,7 +19,8 @@ package org.apache.spark.ml.util
|
|||
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.spark.{Accumulator, SparkContext}
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
|
||||
/**
|
||||
* Abstract class for stopwatches.
|
||||
|
@ -102,12 +103,12 @@ private[spark] class DistributedStopwatch(
|
|||
sc: SparkContext,
|
||||
override val name: String) extends Stopwatch {
|
||||
|
||||
private val elapsedTime: Accumulator[Long] = sc.accumulator(0L, s"DistributedStopwatch($name)")
|
||||
private val elapsedTime: LongAccumulator = sc.longAccumulator(s"DistributedStopwatch($name)")
|
||||
|
||||
override def elapsed(): Long = elapsedTime.value
|
||||
|
||||
override protected def add(duration: Long): Unit = {
|
||||
elapsedTime += duration
|
||||
elapsedTime.add(duration)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -279,7 +279,7 @@ class KMeans private (
|
|||
}
|
||||
|
||||
val activeCenters = activeRuns.map(r => centers(r)).toArray
|
||||
val costAccums = activeRuns.map(_ => sc.accumulator(0.0))
|
||||
val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
|
||||
|
||||
val bcActiveCenters = sc.broadcast(activeCenters)
|
||||
|
||||
|
@ -296,7 +296,7 @@ class KMeans private (
|
|||
points.foreach { point =>
|
||||
(0 until runs).foreach { i =>
|
||||
val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)
|
||||
costAccums(i) += cost
|
||||
costAccums(i).add(cost)
|
||||
val sum = sums(i)(bestCenter)
|
||||
axpy(1.0, point.vector, sum)
|
||||
counts(i)(bestCenter) += 1
|
||||
|
|
|
@ -60,9 +60,9 @@ class StopwatchSuite extends SparkFunSuite with MLlibTestSparkContext {
|
|||
test("DistributedStopwatch on executors") {
|
||||
val sw = new DistributedStopwatch(sc, "sw")
|
||||
val rdd = sc.parallelize(0 until 4, 4)
|
||||
val acc = sc.accumulator(0L)
|
||||
val acc = sc.longAccumulator
|
||||
rdd.foreach { i =>
|
||||
acc += checkStopwatch(sw)
|
||||
acc.add(checkStopwatch(sw))
|
||||
}
|
||||
assert(!sw.isRunning)
|
||||
val elapsed = sw.elapsed()
|
||||
|
@ -88,12 +88,12 @@ class StopwatchSuite extends SparkFunSuite with MLlibTestSparkContext {
|
|||
assert(sw.toString ===
|
||||
s"{\n local: ${localElapsed}ms,\n spark: ${sparkElapsed}ms\n}")
|
||||
val rdd = sc.parallelize(0 until 4, 4)
|
||||
val acc = sc.accumulator(0L)
|
||||
val acc = sc.longAccumulator
|
||||
rdd.foreach { i =>
|
||||
sw("local").start()
|
||||
val duration = checkStopwatch(sw("spark"))
|
||||
sw("local").stop()
|
||||
acc += duration
|
||||
acc.add(duration)
|
||||
}
|
||||
val localElapsed2 = sw("local").elapsed()
|
||||
assert(localElapsed2 === localElapsed)
|
||||
|
|
|
@ -62,15 +62,15 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
|
|||
|
||||
test("foreach") {
|
||||
val ds = Seq(1, 2, 3).toDS()
|
||||
val acc = sparkContext.accumulator(0)
|
||||
ds.foreach(acc += _)
|
||||
val acc = sparkContext.longAccumulator
|
||||
ds.foreach(acc.add(_))
|
||||
assert(acc.value == 6)
|
||||
}
|
||||
|
||||
test("foreachPartition") {
|
||||
val ds = Seq(1, 2, 3).toDS()
|
||||
val acc = sparkContext.accumulator(0)
|
||||
ds.foreachPartition(_.foreach(acc +=))
|
||||
val acc = sparkContext.longAccumulator
|
||||
ds.foreachPartition(_.foreach(acc.add(_)))
|
||||
assert(acc.value == 6)
|
||||
}
|
||||
|
||||
|
|
|
@ -207,15 +207,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
|
|||
|
||||
test("foreach") {
|
||||
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
|
||||
val acc = sparkContext.accumulator(0)
|
||||
ds.foreach(v => acc += v._2)
|
||||
val acc = sparkContext.longAccumulator
|
||||
ds.foreach(v => acc.add(v._2))
|
||||
assert(acc.value == 6)
|
||||
}
|
||||
|
||||
test("foreachPartition") {
|
||||
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
|
||||
val acc = sparkContext.accumulator(0)
|
||||
ds.foreachPartition(_.foreach(v => acc += v._2))
|
||||
val acc = sparkContext.longAccumulator
|
||||
ds.foreachPartition(_.foreach(v => acc.add(v._2)))
|
||||
assert(acc.value == 6)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue