From 2dd67248503306bb08946b1796821e9f9ed4d00e Mon Sep 17 00:00:00 2001 From: Issac Buenrostro Date: Thu, 10 Jul 2014 16:01:08 -0700 Subject: [PATCH] [SPARK-1341] [Streaming] Throttle BlockGenerator to limit rate of data consumption. Author: Issac Buenrostro Closes #945 from ibuenros/SPARK-1341-throttle and squashes the following commits: 5514916 [Issac Buenrostro] Formatting changes, added documentation for streaming throttling, stricter unit tests for throttling. 62f395f [Issac Buenrostro] Add comments and license to streaming RateLimiter.scala 7066438 [Issac Buenrostro] Moved throttle code to RateLimiter class, smoother pushing when throttling active ccafe09 [Issac Buenrostro] Throttle BlockGenerator to limit rate of data consumption. --- docs/configuration.md | 9 +++ .../streaming/receiver/BlockGenerator.scala | 3 +- .../streaming/receiver/RateLimiter.scala | 69 +++++++++++++++++++ .../streaming/NetworkReceiverSuite.scala | 38 ++++++++++ 4 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala diff --git a/docs/configuration.md b/docs/configuration.md index b84104cc7e..0aea23ab59 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -773,6 +773,15 @@ Apart from these, the following properties are also available, and may be useful into blocks of data before storing them in Spark. + + spark.streaming.receiver.maxRate + infinite + + Maximum rate (per second) at which each receiver will push data into blocks. Effectively, + each stream will consume at most this number of records per second. + Setting this configuration to 0 or a negative number will put no limit on the rate. + + spark.streaming.unpersist true diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 78cc2daa56..0316b6862f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -44,7 +44,7 @@ private[streaming] class BlockGenerator( listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf - ) extends Logging { + ) extends RateLimiter(conf) with Logging { private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any]) @@ -81,6 +81,7 @@ private[streaming] class BlockGenerator( * will be periodically pushed into BlockManager. */ def += (data: Any): Unit = synchronized { + waitToPush() currentBuffer += data } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala new file mode 100644 index 0000000000..e4f6ba626e --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import org.apache.spark.{Logging, SparkConf} +import java.util.concurrent.TimeUnit._ + +/** Provides waitToPush() method to limit the rate at which receivers consume data. + * + * waitToPush method will block the thread if too many messages have been pushed too quickly, + * and only return when a new message has been pushed. It assumes that only one message is + * pushed at a time. + * + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each receiver will accept. + * + * @param conf spark configuration + */ +private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { + + private var lastSyncTime = System.nanoTime + private var messagesWrittenSinceSync = 0L + private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0) + private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS) + + def waitToPush() { + if( desiredRate <= 0 ) { + return + } + val now = System.nanoTime + val elapsedNanosecs = math.max(now - lastSyncTime, 1) + val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs + if (rate < desiredRate) { + // It's okay to write; just update some variables and return + messagesWrittenSinceSync += 1 + if (now > lastSyncTime + SYNC_INTERVAL) { + // Sync interval has passed; let's resync + lastSyncTime = now + messagesWrittenSinceSync = 1 + } + } else { + // Calculate how much time we should sleep to bring ourselves to the desired rate. + val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate + val elapsedTimeInMillis = elapsedNanosecs / 1000000 + val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis + if (sleepTimeInMillis > 0) { + logTrace("Natural rate is " + rate + " per second but desired rate is " + + desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.") + Thread.sleep(sleepTimeInMillis) + } + waitToPush() + } + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala index d9ac3c91f6..f4e11f975d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala @@ -145,6 +145,44 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { assert(recordedData.toSet === generatedData.toSet) } + test("block generator throttling") { + val blockGeneratorListener = new FakeBlockGeneratorListener + val blockInterval = 50 + val maxRate = 200 + val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString). + set("spark.streaming.receiver.maxRate", maxRate.toString) + val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) + val expectedBlocks = 20 + val waitTime = expectedBlocks * blockInterval + val expectedMessages = maxRate * waitTime / 1000 + val expectedMessagesPerBlock = maxRate * blockInterval / 1000 + val generatedData = new ArrayBuffer[Int] + + // Generate blocks + val startTime = System.currentTimeMillis() + blockGenerator.start() + var count = 0 + while(System.currentTimeMillis - startTime < waitTime) { + blockGenerator += count + generatedData += count + count += 1 + Thread.sleep(1) + } + blockGenerator.stop() + + val recordedData = blockGeneratorListener.arrayBuffers + assert(blockGeneratorListener.arrayBuffers.size > 0) + assert(recordedData.flatten.toSet === generatedData.toSet) + // recordedData size should be close to the expected rate + assert(recordedData.flatten.size >= expectedMessages * 0.9 && + recordedData.flatten.size <= expectedMessages * 1.1 ) + // the first and last block may be incomplete, so we slice them out + recordedData.slice(1, recordedData.size - 1).foreach { block => + assert(block.size >= expectedMessagesPerBlock * 0.8 && + block.size <= expectedMessagesPerBlock * 1.2 ) + } + } + /** * An implementation of NetworkReceiver that is used for testing a receiver's life cycle. */