Fixed streaming testsuite bugs

This commit is contained in:
Tathagata Das 2013-01-20 03:51:11 -08:00
parent 4f8fe58b25
commit 33bad85bb9
7 changed files with 24 additions and 6 deletions

View file

@ -34,12 +34,14 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint", new Duration(1000));
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port");
}

View file

@ -8,6 +8,11 @@ class BasicOperationsSuite extends TestSuiteBase {
override def framework() = "BasicOperationsSuite"
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
test("map") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(

View file

@ -15,9 +15,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
after {
if (ssc != null) ssc.stop()
FileUtils.deleteDirectory(new File(checkpointDir))
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
var ssc: StreamingContext = null
@ -26,8 +28,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
override def batchDuration = Milliseconds(500)
override def checkpointDir = "checkpoint"
override def checkpointInterval = batchDuration
override def actuallyWait = true

View file

@ -22,6 +22,9 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
after {
FailureSuite.reset()
FileUtils.deleteDirectory(new File(checkpointDir))
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
override def framework = "CheckpointSuite"

View file

@ -40,6 +40,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(testDir)
testDir = null
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
test("network input stream") {

View file

@ -10,7 +10,7 @@ import collection.mutable.SynchronizedBuffer
import java.io.{ObjectInputStream, IOException}
import org.scalatest.FunSuite
import org.scalatest.{BeforeAndAfter, FunSuite}
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@ -56,7 +56,7 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
* This is the base trait for Spark Streaming testsuites. This provides basic functionality
* to run user-defined set of input on user-defined stream operations, and verify the output.
*/
trait TestSuiteBase extends FunSuite with Logging {
trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
def framework = "TestSuiteBase"
@ -64,7 +64,7 @@ trait TestSuiteBase extends FunSuite with Logging {
def batchDuration = Seconds(1)
def checkpointDir = null.asInstanceOf[String]
def checkpointDir = "checkpoint"
def checkpointInterval = batchDuration

View file

@ -11,6 +11,11 @@ class WindowOperationsSuite extends TestSuiteBase {
override def batchDuration = Seconds(1)
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
val largerSlideInput = Seq(
Seq(("a", 1)),
Seq(("a", 2)), // 1st window from here