Fixed the failing test.
This commit is contained in:
parent
ad8aff6ca4
commit
fd6e51deec
|
@ -13,6 +13,20 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
|
|||
|
||||
var sc: SparkContext = null
|
||||
|
||||
implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
|
||||
def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
|
||||
t1 ++= t2
|
||||
t1
|
||||
}
|
||||
def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
|
||||
t1 += t2
|
||||
t1
|
||||
}
|
||||
def zero(t: mutable.Set[A]) : mutable.Set[A] = {
|
||||
new mutable.HashSet[A]()
|
||||
}
|
||||
}
|
||||
|
||||
after {
|
||||
if (sc != null) {
|
||||
sc.stop()
|
||||
|
@ -40,7 +54,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
|
|||
}
|
||||
|
||||
test ("add value to collection accumulators") {
|
||||
import SetAccum._
|
||||
val maxI = 1000
|
||||
for (nThreads <- List(1, 10)) { //test single & multi-threaded
|
||||
sc = new SparkContext("local[" + nThreads + "]", "test")
|
||||
|
@ -60,22 +73,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
|
|||
}
|
||||
}
|
||||
|
||||
implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
|
||||
def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
|
||||
t1 ++= t2
|
||||
t1
|
||||
}
|
||||
def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
|
||||
t1 += t2
|
||||
t1
|
||||
}
|
||||
def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
|
||||
new mutable.HashSet[Any]()
|
||||
}
|
||||
}
|
||||
|
||||
test ("value not readable in tasks") {
|
||||
import SetAccum._
|
||||
val maxI = 1000
|
||||
for (nThreads <- List(1, 10)) { //test single & multi-threaded
|
||||
sc = new SparkContext("local[" + nThreads + "]", "test")
|
||||
|
@ -123,7 +121,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
|
|||
}
|
||||
|
||||
test ("localValue readable in tasks") {
|
||||
import SetAccum._
|
||||
val maxI = 1000
|
||||
for (nThreads <- List(1, 10)) { //test single & multi-threaded
|
||||
sc = new SparkContext("local[" + nThreads + "]", "test")
|
||||
|
@ -135,7 +132,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
|
|||
}
|
||||
acc.value should be ( (0 to maxI).toSet)
|
||||
sc.stop()
|
||||
sc = null
|
||||
sc = null
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue