[SPARK-7356] [STREAMING] Fix flakey tests in FlumePollingStreamSuite using SparkSink's batch CountDownLatch.

This is meant to make the FlumePollingStreamSuite deterministic. Now we basically count the number of batches that have been completed - and then verify the results rather than sleeping for random periods of time.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #5918 from harishreedharan/flume-test-fix and squashes the following commits:

93f24f3 [Hari Shreedharan] Add an eventually block to ensure that all received data is processed. Refactor the dstream creation and remove redundant code.
1108804 [Hari Shreedharan] [SPARK-7356][STREAMING] Fix flakey tests in FlumePollingStreamSuite using SparkSink's batch CountDownLatch.
This commit is contained in:
Hari Shreedharan 2015-05-13 16:43:30 -07:00 committed by Andrew Or
parent bb6dec3b16
commit 61d1e87c0d

View file

@ -18,15 +18,18 @@
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
import java.util.concurrent._
import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.flume.Context
import org.apache.flume.channel.MemoryChannel
import org.apache.flume.conf.Configurables
import org.apache.flume.event.EventBuilder
import org.scalatest.concurrent.Eventually._
import org.scalatest.{BeforeAndAfter, FunSuite}
@ -57,11 +60,11 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
before(beforeFunction())
ignore("flume polling test") {
test("flume polling test") {
testMultipleTimes(testFlumePolling)
}
ignore("flume polling test multiple hosts") {
test("flume polling test multiple hosts") {
testMultipleTimes(testFlumePollingMultipleHost)
}
@ -100,18 +103,8 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
Configurables.configure(sink, context)
sink.setChannel(channel)
sink.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", sink.getPort())),
StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
writeAndVerify(Seq(channel), ssc, outputBuffer)
writeAndVerify(Seq(sink), Seq(channel))
assertChannelIsEmpty(channel)
sink.stop()
channel.stop()
@ -142,21 +135,8 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
Configurables.configure(sink2, context)
sink2.setChannel(channel2)
sink2.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val addresses = Seq(sink.getPort(), sink2.getPort()).map(new InetSocketAddress("localhost", _))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
eventsPerBatch, 5)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
try {
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
writeAndVerify(Seq(sink, sink2), Seq(channel, channel2))
assertChannelIsEmpty(channel)
assertChannelIsEmpty(channel2)
} finally {
@ -167,28 +147,39 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
}
}
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
def writeAndVerify(sinks: Seq[SparkSink], channels: Seq[MemoryChannel]) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val addresses = sinks.map(sink => new InetSocketAddress("localhost", sink.getPort()))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
eventsPerBatch, 5)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val executor = Executors.newCachedThreadPool()
val executorCompletion = new ExecutorCompletionService[Void](executor)
channels.map(channel => {
val latch = new CountDownLatch(batchCount * channels.size)
sinks.foreach(_.countdownWhenBatchReceived(latch))
channels.foreach(channel => {
executorCompletion.submit(new TxnSubmitter(channel, clock))
})
for (i <- 0 until channels.size) {
executorCompletion.take()
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < batchCount * channels.size &&
System.currentTimeMillis() - startTime < 15000) {
logInfo("output.size = " + outputBuffer.size)
Thread.sleep(100)
}
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()
latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
clock.advance(batchDuration.milliseconds)
// The eventually is required to ensure that all data in the batch has been processed.
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val flattenedBuffer = outputBuffer.flatten
assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
var counter = 0
@ -211,6 +202,8 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
}
assert(counter === totalEventsPerChannel * channels.size)
}
ssc.stop()
}
def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
@ -234,7 +227,6 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
tx.commit()
tx.close()
Thread.sleep(500) // Allow some time for the events to reach
clock.advance(batchDuration.milliseconds)
}
null
}