From fd6e51deec83f01be3db41e84255329eedbe15da Mon Sep 17 00:00:00 2001 From: folone Date: Sun, 20 Jan 2013 17:02:58 +0100 Subject: [PATCH] Fixed the failing test. --- .../test/scala/spark/AccumulatorSuite.scala | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index d8be99dde7..9f5335978f 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -13,6 +13,20 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter var sc: SparkContext = null + implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] { + def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = { + t1 ++= t2 + t1 + } + def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = { + t1 += t2 + t1 + } + def zero(t: mutable.Set[A]) : mutable.Set[A] = { + new mutable.HashSet[A]() + } + } + after { if (sc != null) { sc.stop() @@ -40,7 +54,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } test ("add value to collection accumulators") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -60,22 +73,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } } - implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] { - def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = { - t1 ++= t2 - t1 - } - def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = { - t1 += t2 - t1 - } - def zero(t: mutable.Set[Any]) : mutable.Set[Any] = { - new mutable.HashSet[Any]() - } - } - test ("value not readable in tasks") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -123,7 +121,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } test ("localValue readable in tasks") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -135,7 +132,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } acc.value should be ( (0 to maxI).toSet) sc.stop() - sc = null + sc = null } }