[SPARK-15086][CORE][STREAMING] Deprecate old Java accumulator API

## What changes were proposed in this pull request?

- Deprecate old Java accumulator API; should use Scala now
- Update Java tests and examples
- Don't bother testing old accumulator API in Java 8 (too)
- (fix a misspelling too)

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #13606 from srowen/SPARK-15086.
This commit is contained in:
Sean Owen 2016-06-12 11:44:33 -07:00 committed by Reynold Xin
parent 50248dcfff
commit f51dfe616b
10 changed files with 33 additions and 90 deletions

View file

@ -1245,7 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
* with `+=`. Only the driver can access the accumulable's `value`.
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
@ -1259,8 +1259,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
* access the accumuable's `value`.
* Spark UI. Tasks can add values to the accumulable using the `+=` operator. Only the driver can
* access the accumulable's `value`.
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/

View file

@ -530,6 +530,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
@deprecated("use sc().longAccumulator()", "2.0.0")
def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
@ -539,6 +540,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
@deprecated("use sc().longAccumulator(String)", "2.0.0")
def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue, name)(IntAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Integer]]
@ -547,6 +549,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
@deprecated("use sc().doubleAccumulator()", "2.0.0")
def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
@ -556,6 +559,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
@deprecated("use sc().doubleAccumulator(String)", "2.0.0")
def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Double]]
@ -564,6 +568,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
@deprecated("use sc().longAccumulator()", "2.0.0")
def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
/**
@ -572,6 +577,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
@deprecated("use sc().longAccumulator(String)", "2.0.0")
def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
intAccumulator(initialValue, name)
@ -579,6 +585,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
@deprecated("use sc().doubleAccumulator()", "2.0.0")
def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue)
@ -589,6 +596,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
@deprecated("use sc().doubleAccumulator(String)", "2.0.0")
def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue, name)
@ -613,7 +621,7 @@ class JavaSparkContext(val sc: SparkContext)
/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
* can "add" values with `add`. Only the master can access the accumulable's `value`.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
@ -621,7 +629,7 @@ class JavaSparkContext(val sc: SparkContext)
/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
* can "add" values with `add`. Only the master can access the accumulable's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/

View file

@ -70,6 +70,7 @@ import org.apache.spark.partial.PartialResult;
import org.apache.spark.rdd.RDD;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.StatCounter;
// The test suite itself is Serializable so that anonymous Function implementations can be
@ -287,7 +288,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void foreach() {
final Accumulator<Integer> accum = sc.accumulator(0);
final LongAccumulator accum = sc.sc().longAccumulator();
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreach(new VoidFunction<String>() {
@Override
@ -300,7 +301,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void foreachPartition() {
final Accumulator<Integer> accum = sc.accumulator(0);
final LongAccumulator accum = sc.sc().longAccumulator();
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
@ -1377,6 +1378,7 @@ public class JavaAPISuite implements Serializable {
assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}
@SuppressWarnings("deprecation")
@Test
public void accumulators() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));

View file

@ -1394,7 +1394,7 @@ Note that, when programmers define their own type of AccumulatorV2, the resultin
<div data-lang="java" markdown="1">
{% highlight java %}
Accumulator<Integer> accum = sc.accumulator(0);
LongAccumulator accum = sc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
@ -1485,7 +1485,7 @@ data.map { x => accum += x; x }
<div data-lang="java" markdown="1">
{% highlight java %}
Accumulator<Integer> accum = sc.accumulator(0);
LongAccumulator accum = sc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
{% endhighlight %}

View file

@ -1452,13 +1452,13 @@ class JavaWordBlacklist {
class JavaDroppedWordsCounter {
private static volatile Accumulator<Integer> instance = null;
private static volatile LongAccumulator instance = null;
public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.accumulator(0, "WordsInBlacklistCounter");
instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
}
}
}
@ -1472,7 +1472,7 @@ wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>()
// Get or register the blacklist Broadcast
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override

View file

@ -29,7 +29,6 @@ import scala.Tuple2;
import com.google.common.io.Files;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
@ -41,6 +40,7 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.LongAccumulator;
/**
* Use this singleton to get or register a Broadcast variable.
@ -67,13 +67,13 @@ class JavaWordBlacklist {
*/
class JavaDroppedWordsCounter {
private static volatile Accumulator<Integer> instance = null;
private static volatile LongAccumulator instance = null;
public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.accumulator(0, "WordsInBlacklistCounter");
instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
}
}
}
@ -158,7 +158,7 @@ public final class JavaRecoverableNetworkWordCount {
final Broadcast<List<String>> blacklist =
JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
final Accumulator<Integer> droppedWordsCounter =
final LongAccumulator droppedWordsCounter =
JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {

View file

@ -33,8 +33,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.spark.Accumulator;
import org.apache.spark.AccumulatorParam;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@ -302,43 +300,6 @@ public class Java8RDDAPISuite implements Serializable {
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}
@Test
public void accumulators() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
Accumulator<Integer> intAccum = sc.intAccumulator(10);
rdd.foreach(intAccum::add);
Assert.assertEquals((Integer) 25, intAccum.value());
Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
rdd.foreach(x -> doubleAccum.add((double) x));
Assert.assertEquals((Double) 25.0, doubleAccum.value());
// Try a custom accumulator type
AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
@Override
public Float addInPlace(Float r, Float t) {
return r + t;
}
@Override
public Float addAccumulator(Float r, Float t) {
return r + t;
}
@Override
public Float zero(Float initialValue) {
return 0.0f;
}
};
Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
rdd.foreach(x -> floatAccum.add((float) x));
Assert.assertEquals((Float) 25.0f, floatAccum.value());
// Test the setValue method
floatAccum.setValue(5.0f);
Assert.assertEquals((Float) 5.0f, floatAccum.value());
}
@Test
public void keyBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));

View file

@ -27,7 +27,6 @@ import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.JavaPairRDD;
@ -361,33 +360,6 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
assertOrderInvariantEquals(expected, result);
}
@Test
public void testForeachRDD() {
final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0);
final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0);
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,1,1),
Arrays.asList(1,1,1));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
stream.foreachRDD(rdd -> {
accumRdd.add(1);
rdd.foreach(x -> accumEle.add(1));
});
// This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
stream.foreachRDD((rdd, time) -> {
return;
});
JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(2, accumRdd.value().intValue());
Assert.assertEquals(6, accumEle.value().intValue());
}
@Test
public void testPairFlatMap() {
List<List<String>> inputData = Arrays.asList(

View file

@ -32,7 +32,6 @@ import com.google.common.base.Objects;
import org.junit.*;
import org.junit.rules.ExpectedException;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
@ -40,6 +39,7 @@ import org.apache.spark.sql.catalyst.encoders.OuterScopes;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.test.TestSparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.LongAccumulator;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.types.DataTypes.*;
@ -157,7 +157,7 @@ public class JavaDatasetSuite implements Serializable {
@Test
public void testForeach() {
final Accumulator<Integer> accum = jsc.accumulator(0);
final LongAccumulator accum = jsc.sc().longAccumulator();
List<String> data = Arrays.asList("a", "b", "c");
Dataset<String> ds = spark.createDataset(data, Encoders.STRING());

View file

@ -36,7 +36,6 @@ import org.junit.Test;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@ -46,6 +45,7 @@ import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.Utils;
// The test suite itself is Serializable so that anonymous Function implementations can be
@ -794,8 +794,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
public void testForeachRDD() {
final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0);
final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0);
final LongAccumulator accumRdd = ssc.sparkContext().sc().longAccumulator();
final LongAccumulator accumEle = ssc.sparkContext().sc().longAccumulator();
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,1,1),
Arrays.asList(1,1,1));