[SPARK-8367] [STREAMING] Add a limit for 'spark.streaming.blockInterval` since a data loss bug.

Bug had reported in the jira [SPARK-8367](https://issues.apache.org/jira/browse/SPARK-8367)
The relution is limitting the configuration `spark.streaming.blockInterval` to a positive number.

Author: huangzhaowei <carlmartinmax@gmail.com>
Author: huangzhaowei <SaintBacchus@users.noreply.github.com>

Closes #6818 from SaintBacchus/SPARK-8367 and squashes the following commits:

c9d1927 [huangzhaowei] Update BlockGenerator.scala
bd3f71a [huangzhaowei] Use requre instead of if
3d17796 [huangzhaowei] [SPARK_8367][Streaming]Add a limit for 'spark.streaming.blockInterval' since a data loss bug.
This commit is contained in:
huangzhaowei 2015-06-16 08:16:09 +02:00 committed by Sean Owen
parent bc76a0f750
commit ccf010f27b

View file

@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{SystemClock, Utils}
import org.apache.spark.util.SystemClock
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
@ -80,6 +80,8 @@ private[streaming] class BlockGenerator(
private val clock = new SystemClock()
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)