[SPARK-1341] [Streaming] Throttle BlockGenerator to limit rate of data consumption.
Author: Issac Buenrostro <buenrostro@ooyala.com> Closes #945 from ibuenros/SPARK-1341-throttle and squashes the following commits: 5514916 [Issac Buenrostro] Formatting changes, added documentation for streaming throttling, stricter unit tests for throttling. 62f395f [Issac Buenrostro] Add comments and license to streaming RateLimiter.scala 7066438 [Issac Buenrostro] Moved throttle code to RateLimiter class, smoother pushing when throttling active ccafe09 [Issac Buenrostro] Throttle BlockGenerator to limit rate of data consumption.
This commit is contained in:
parent
40a8fef4e6
commit
2dd6724850
|
@ -773,6 +773,15 @@ Apart from these, the following properties are also available, and may be useful
|
|||
into blocks of data before storing them in Spark.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.streaming.receiver.maxRate</code></td>
|
||||
<td>infinite</td>
|
||||
<td>
|
||||
Maximum rate (per second) at which each receiver will push data into blocks. Effectively,
|
||||
each stream will consume at most this number of records per second.
|
||||
Setting this configuration to 0 or a negative number will put no limit on the rate.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.streaming.unpersist</code></td>
|
||||
<td>true</td>
|
||||
|
|
|
@ -44,7 +44,7 @@ private[streaming] class BlockGenerator(
|
|||
listener: BlockGeneratorListener,
|
||||
receiverId: Int,
|
||||
conf: SparkConf
|
||||
) extends Logging {
|
||||
) extends RateLimiter(conf) with Logging {
|
||||
|
||||
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
|
||||
|
||||
|
@ -81,6 +81,7 @@ private[streaming] class BlockGenerator(
|
|||
* will be periodically pushed into BlockManager.
|
||||
*/
|
||||
def += (data: Any): Unit = synchronized {
|
||||
waitToPush()
|
||||
currentBuffer += data
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.streaming.receiver
|
||||
|
||||
import org.apache.spark.{Logging, SparkConf}
|
||||
import java.util.concurrent.TimeUnit._
|
||||
|
||||
/** Provides waitToPush() method to limit the rate at which receivers consume data.
|
||||
*
|
||||
* waitToPush method will block the thread if too many messages have been pushed too quickly,
|
||||
* and only return when a new message has been pushed. It assumes that only one message is
|
||||
* pushed at a time.
|
||||
*
|
||||
* The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
|
||||
* per second that each receiver will accept.
|
||||
*
|
||||
* @param conf spark configuration
|
||||
*/
|
||||
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
|
||||
|
||||
private var lastSyncTime = System.nanoTime
|
||||
private var messagesWrittenSinceSync = 0L
|
||||
private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
|
||||
private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
|
||||
|
||||
def waitToPush() {
|
||||
if( desiredRate <= 0 ) {
|
||||
return
|
||||
}
|
||||
val now = System.nanoTime
|
||||
val elapsedNanosecs = math.max(now - lastSyncTime, 1)
|
||||
val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
|
||||
if (rate < desiredRate) {
|
||||
// It's okay to write; just update some variables and return
|
||||
messagesWrittenSinceSync += 1
|
||||
if (now > lastSyncTime + SYNC_INTERVAL) {
|
||||
// Sync interval has passed; let's resync
|
||||
lastSyncTime = now
|
||||
messagesWrittenSinceSync = 1
|
||||
}
|
||||
} else {
|
||||
// Calculate how much time we should sleep to bring ourselves to the desired rate.
|
||||
val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
|
||||
val elapsedTimeInMillis = elapsedNanosecs / 1000000
|
||||
val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
|
||||
if (sleepTimeInMillis > 0) {
|
||||
logTrace("Natural rate is " + rate + " per second but desired rate is " +
|
||||
desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
|
||||
Thread.sleep(sleepTimeInMillis)
|
||||
}
|
||||
waitToPush()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -145,6 +145,44 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
|
|||
assert(recordedData.toSet === generatedData.toSet)
|
||||
}
|
||||
|
||||
test("block generator throttling") {
|
||||
val blockGeneratorListener = new FakeBlockGeneratorListener
|
||||
val blockInterval = 50
|
||||
val maxRate = 200
|
||||
val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
|
||||
set("spark.streaming.receiver.maxRate", maxRate.toString)
|
||||
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
|
||||
val expectedBlocks = 20
|
||||
val waitTime = expectedBlocks * blockInterval
|
||||
val expectedMessages = maxRate * waitTime / 1000
|
||||
val expectedMessagesPerBlock = maxRate * blockInterval / 1000
|
||||
val generatedData = new ArrayBuffer[Int]
|
||||
|
||||
// Generate blocks
|
||||
val startTime = System.currentTimeMillis()
|
||||
blockGenerator.start()
|
||||
var count = 0
|
||||
while(System.currentTimeMillis - startTime < waitTime) {
|
||||
blockGenerator += count
|
||||
generatedData += count
|
||||
count += 1
|
||||
Thread.sleep(1)
|
||||
}
|
||||
blockGenerator.stop()
|
||||
|
||||
val recordedData = blockGeneratorListener.arrayBuffers
|
||||
assert(blockGeneratorListener.arrayBuffers.size > 0)
|
||||
assert(recordedData.flatten.toSet === generatedData.toSet)
|
||||
// recordedData size should be close to the expected rate
|
||||
assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
|
||||
recordedData.flatten.size <= expectedMessages * 1.1 )
|
||||
// the first and last block may be incomplete, so we slice them out
|
||||
recordedData.slice(1, recordedData.size - 1).foreach { block =>
|
||||
assert(block.size >= expectedMessagesPerBlock * 0.8 &&
|
||||
block.size <= expectedMessagesPerBlock * 1.2 )
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
|
||||
*/
|
||||
|
|
Loading…
Reference in a new issue