From bb4102b0eefd7321d1fadf9df6db79c8dd9880fb Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 25 Apr 2013 14:38:01 +0530 Subject: [PATCH] Fixed breaking tests in streaming checkpoint suite. Changed RichInt to Int as it is final and not serializable --- .../spark/streaming/CheckpointSuite.scala | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index cac86deeaf..f9285b19e2 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -1,16 +1,19 @@ package spark.streaming -import dstream.FileInputDStream -import spark.streaming.StreamingContext._ import java.io.File -import runtime.RichInt -import org.scalatest.BeforeAndAfter + +import scala.collection.mutable.ArrayBuffer + import org.apache.commons.io.FileUtils -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import util.{Clock, ManualClock} -import scala.util.Random +import org.scalatest.BeforeAndAfter + import com.google.common.io.Files +import spark.streaming.StreamingContext.toPairDStreamFunctions +import spark.streaming.dstream.FileInputDStream +import spark.streaming.util.ManualClock + + /** * This test suites tests the checkpointing functionality of DStreams - @@ -56,13 +59,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Setup the streams val input = (1 to 10).map(_ => Seq("a")).toSeq val operation = (st: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) } st.map(x => (x, 1)) - .updateStateByKey[RichInt](updateFunc) + .updateStateByKey(updateFunc) .checkpoint(stateStreamCheckpointInterval) - .map(t => (t._1, t._2.self)) + .map(t => (t._1, t._2)) } var ssc = setupStreams(input, operation) var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head @@ -162,13 +165,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val input = (1 to 10).map(_ => Seq("a")).toSeq val output = (1 to 10).map(x => Seq(("a", x))).toSeq val operation = (st: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) } st.map(x => (x, 1)) - .updateStateByKey[RichInt](updateFunc) + .updateStateByKey(updateFunc) .checkpoint(batchDuration * 2) - .map(t => (t._1, t._2.self)) + .map(t => (t._1, t._2)) } testCheckpointedOperation(input, operation, output, 7) } @@ -350,4 +353,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] outputStream.output } -} \ No newline at end of file +}