Merge pull request #365 from ryanlecompte/rate_limiter_timing_cleanup
Improve sleeping algorithm for rate limiting output streams
This commit is contained in:
commit
530493b0e8
|
@ -1,9 +1,14 @@
|
|||
package spark.util
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import java.io.OutputStream
|
||||
import java.util.concurrent.TimeUnit._
|
||||
|
||||
class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
|
||||
var lastSyncTime = System.nanoTime()
|
||||
val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
|
||||
val CHUNK_SIZE = 8192
|
||||
var lastSyncTime = System.nanoTime
|
||||
var bytesWrittenSinceSync: Long = 0
|
||||
|
||||
override def write(b: Int) {
|
||||
|
@ -15,34 +20,13 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
|
|||
write(bytes, 0, bytes.length)
|
||||
}
|
||||
|
||||
override def write(bytes: Array[Byte], offset: Int, length: Int) {
|
||||
val CHUNK_SIZE = 8192
|
||||
var pos = 0
|
||||
while (pos < length) {
|
||||
val writeSize = math.min(length - pos, CHUNK_SIZE)
|
||||
@tailrec
|
||||
override final def write(bytes: Array[Byte], offset: Int, length: Int) {
|
||||
val writeSize = math.min(length - offset, CHUNK_SIZE)
|
||||
if (writeSize > 0) {
|
||||
waitToWrite(writeSize)
|
||||
out.write(bytes, offset + pos, writeSize)
|
||||
pos += writeSize
|
||||
}
|
||||
}
|
||||
|
||||
def waitToWrite(numBytes: Int) {
|
||||
while (true) {
|
||||
val now = System.nanoTime()
|
||||
val elapsed = math.max(now - lastSyncTime, 1)
|
||||
val rate = bytesWrittenSinceSync.toDouble / (elapsed / 1.0e9)
|
||||
if (rate < bytesPerSec) {
|
||||
// It's okay to write; just update some variables and return
|
||||
bytesWrittenSinceSync += numBytes
|
||||
if (now > lastSyncTime + (1e10).toLong) {
|
||||
// Ten seconds have passed since lastSyncTime; let's resync
|
||||
lastSyncTime = now
|
||||
bytesWrittenSinceSync = numBytes
|
||||
}
|
||||
return
|
||||
} else {
|
||||
Thread.sleep(5)
|
||||
}
|
||||
out.write(bytes, offset, writeSize)
|
||||
write(bytes, offset + writeSize, length)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,4 +37,26 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
|
|||
override def close() {
|
||||
out.close()
|
||||
}
|
||||
}
|
||||
|
||||
@tailrec
|
||||
private def waitToWrite(numBytes: Int) {
|
||||
val now = System.nanoTime
|
||||
val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
|
||||
val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
|
||||
if (rate < bytesPerSec) {
|
||||
// It's okay to write; just update some variables and return
|
||||
bytesWrittenSinceSync += numBytes
|
||||
if (now > lastSyncTime + SYNC_INTERVAL) {
|
||||
// Sync interval has passed; let's resync
|
||||
lastSyncTime = now
|
||||
bytesWrittenSinceSync = numBytes
|
||||
}
|
||||
} else {
|
||||
// Calculate how much time we should sleep to bring ourselves to the desired rate.
|
||||
// Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
|
||||
val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
|
||||
if (sleepTime > 0) Thread.sleep(sleepTime)
|
||||
waitToWrite(numBytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package spark.util
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.util.concurrent.TimeUnit._
|
||||
|
||||
class RateLimitedOutputStreamSuite extends FunSuite {
|
||||
|
||||
private def benchmark[U](f: => U): Long = {
|
||||
val start = System.nanoTime
|
||||
f
|
||||
System.nanoTime - start
|
||||
}
|
||||
|
||||
test("write") {
|
||||
val underlying = new ByteArrayOutputStream
|
||||
val data = "X" * 41000
|
||||
val stream = new RateLimitedOutputStream(underlying, 10000)
|
||||
val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) }
|
||||
assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4)
|
||||
assert(underlying.toString("UTF-8") == data)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue