[SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated
Replace SynchronizeQueue with synchronized access to a Queue Author: Sean Owen <sowen@cloudera.com> Closes #11111 from srowen/SPARK-13170.
This commit is contained in:
parent
e30121afac
commit
68ed3632c5
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.examples.streaming
|
||||
|
||||
import scala.collection.mutable.SynchronizedQueue
|
||||
import scala.collection.mutable.Queue
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.rdd.RDD
|
||||
|
@ -34,7 +34,7 @@ object QueueStream {
|
|||
|
||||
// Create the queue through which RDDs can be pushed to
|
||||
// a QueueInputDStream
|
||||
val rddQueue = new SynchronizedQueue[RDD[Int]]()
|
||||
val rddQueue = new Queue[RDD[Int]]()
|
||||
|
||||
// Create the QueueInputDStream and use it do some processing
|
||||
val inputStream = ssc.queueStream(rddQueue)
|
||||
|
@ -45,7 +45,9 @@ object QueueStream {
|
|||
|
||||
// Create and push some RDDs into
|
||||
for (i <- 1 to 30) {
|
||||
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
|
||||
rddQueue.synchronized {
|
||||
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
|
||||
}
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
ssc.stop()
|
||||
|
|
|
@ -459,7 +459,7 @@ class StreamingContext private[streaming] (
|
|||
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
|
||||
* those RDDs, so `queueStream` doesn't support checkpointing.
|
||||
*
|
||||
* @param queue Queue of RDDs
|
||||
* @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
|
||||
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
|
||||
* @tparam T Type of objects in the RDD
|
||||
*/
|
||||
|
@ -477,7 +477,7 @@ class StreamingContext private[streaming] (
|
|||
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
|
||||
* those RDDs, so `queueStream` doesn't support checkpointing.
|
||||
*
|
||||
* @param queue Queue of RDDs
|
||||
* @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
|
||||
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
|
||||
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty.
|
||||
* Set as null if no RDD should be returned when empty
|
||||
|
|
|
@ -48,12 +48,15 @@ class QueueInputDStream[T: ClassTag](
|
|||
|
||||
override def compute(validTime: Time): Option[RDD[T]] = {
|
||||
val buffer = new ArrayBuffer[RDD[T]]()
|
||||
if (oneAtATime && queue.size > 0) {
|
||||
buffer += queue.dequeue()
|
||||
} else {
|
||||
buffer ++= queue.dequeueAll(_ => true)
|
||||
queue.synchronized {
|
||||
if (oneAtATime && queue.nonEmpty) {
|
||||
buffer += queue.dequeue()
|
||||
} else {
|
||||
buffer ++= queue
|
||||
queue.clear()
|
||||
}
|
||||
}
|
||||
if (buffer.size > 0) {
|
||||
if (buffer.nonEmpty) {
|
||||
if (oneAtATime) {
|
||||
Some(buffer.head)
|
||||
} else {
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.util.concurrent._
|
|||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.SynchronizedQueue
|
||||
import scala.collection.mutable
|
||||
import scala.language.postfixOps
|
||||
|
||||
import com.google.common.io.Files
|
||||
|
@ -40,7 +40,6 @@ import org.apache.spark.storage.StorageLevel
|
|||
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
|
||||
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
|
||||
import org.apache.spark.streaming.receiver.Receiver
|
||||
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
|
||||
import org.apache.spark.util.{ManualClock, Utils}
|
||||
|
||||
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
||||
|
@ -67,7 +66,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
// Feed data to the server to send to the network receiver
|
||||
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
val expectedOutput = input.map(_.toString)
|
||||
for (i <- 0 until input.size) {
|
||||
for (i <- input.indices) {
|
||||
testServer.send(input(i).toString + "\n")
|
||||
Thread.sleep(500)
|
||||
clock.advance(batchDuration.milliseconds)
|
||||
|
@ -102,8 +101,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
// Verify whether all the elements received are as expected
|
||||
// (whether the elements were received one in each interval is not verified)
|
||||
val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
|
||||
assert(output.size === expectedOutput.size)
|
||||
for (i <- 0 until output.size) {
|
||||
assert(output.length === expectedOutput.size)
|
||||
for (i <- output.indices) {
|
||||
assert(output(i) === expectedOutput(i))
|
||||
}
|
||||
}
|
||||
|
@ -242,11 +241,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
val input = Seq("1", "2", "3", "4", "5")
|
||||
val expectedOutput = input.map(Seq(_))
|
||||
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
|
||||
def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
|
||||
def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)
|
||||
|
||||
// Set up the streaming context and input streams
|
||||
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
|
||||
val queue = new SynchronizedQueue[RDD[String]]()
|
||||
val queue = new mutable.Queue[RDD[String]]()
|
||||
val queueStream = ssc.queueStream(queue, oneAtATime = true)
|
||||
val outputStream = new TestOutputStream(queueStream, outputQueue)
|
||||
outputStream.register()
|
||||
|
@ -256,9 +255,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
|
||||
val inputIterator = input.toIterator
|
||||
for (i <- 0 until input.size) {
|
||||
for (i <- input.indices) {
|
||||
// Enqueue more than 1 item per tick but they should dequeue one at a time
|
||||
inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
|
||||
inputIterator.take(2).foreach { i =>
|
||||
queue.synchronized {
|
||||
queue += ssc.sparkContext.makeRDD(Seq(i))
|
||||
}
|
||||
}
|
||||
clock.advance(batchDuration.milliseconds)
|
||||
}
|
||||
Thread.sleep(1000)
|
||||
|
@ -281,13 +284,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
|
||||
test("queue input stream - oneAtATime = false") {
|
||||
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
|
||||
def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
|
||||
def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)
|
||||
val input = Seq("1", "2", "3", "4", "5")
|
||||
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
|
||||
|
||||
// Set up the streaming context and input streams
|
||||
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
|
||||
val queue = new SynchronizedQueue[RDD[String]]()
|
||||
val queue = new mutable.Queue[RDD[String]]()
|
||||
val queueStream = ssc.queueStream(queue, oneAtATime = false)
|
||||
val outputStream = new TestOutputStream(queueStream, outputQueue)
|
||||
outputStream.register()
|
||||
|
@ -298,12 +301,20 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
|
||||
// Enqueue the first 3 items (one by one), they should be merged in the next batch
|
||||
val inputIterator = input.toIterator
|
||||
inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
|
||||
inputIterator.take(3).foreach { i =>
|
||||
queue.synchronized {
|
||||
queue += ssc.sparkContext.makeRDD(Seq(i))
|
||||
}
|
||||
}
|
||||
clock.advance(batchDuration.milliseconds)
|
||||
Thread.sleep(1000)
|
||||
|
||||
// Enqueue the remaining items (again one by one), merged in the final batch
|
||||
inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
|
||||
inputIterator.foreach { i =>
|
||||
queue.synchronized {
|
||||
queue += ssc.sparkContext.makeRDD(Seq(i))
|
||||
}
|
||||
}
|
||||
clock.advance(batchDuration.milliseconds)
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue