diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 7210439509..9020be166a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global @@ -127,9 +127,9 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { /** Listener that collects information on processed batches */ class BatchInfoCollector extends StreamingListener { - val batchInfosCompleted = new ArrayBuffer[BatchInfo] - val batchInfosStarted = new ArrayBuffer[BatchInfo] - val batchInfosSubmitted = new ArrayBuffer[BatchInfo] + val batchInfosCompleted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo] + val batchInfosStarted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo] + val batchInfosSubmitted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo] override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { batchInfosSubmitted += batchSubmitted.batchInfo @@ -146,9 +146,10 @@ class BatchInfoCollector extends StreamingListener { /** Listener that collects information on processed batches */ class ReceiverInfoCollector extends StreamingListener { - val startedReceiverStreamIds = new ArrayBuffer[Int] - val stoppedReceiverStreamIds = new ArrayBuffer[Int]() - val receiverErrors = new ArrayBuffer[(Int, String, String)]() + val startedReceiverStreamIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int] + val stoppedReceiverStreamIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int] + val receiverErrors = + new ArrayBuffer[(Int, String, String)] with SynchronizedBuffer[(Int, String, String)] override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { startedReceiverStreamIds += receiverStarted.receiverInfo.streamId