[SPARK-11627] Add initial input rate limit for spark streaming backpressure mechanism.
https://issues.apache.org/jira/browse/SPARK-11627 Spark Streaming backpressure mechanism has no initial input rate limit, it might cause OOM exception. In the firest batch task ,receivers receive data at the maximum speed they can reach,it might exhaust executors memory resources. Add a initial input rate limit value can make sure the Streaming job execute success in the first batch,then the backpressure mechanism can adjust receiving rate adaptively. Author: junhao <junhao@mogujie.com> Closes #9593 from junhaoMg/junhao-dev.
This commit is contained in:
parent
5f37aad48c
commit
7218c0eba9
|
@ -1470,6 +1470,14 @@ Apart from these, the following properties are also available, and may be useful
|
|||
if they are set (see below).
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.streaming.backpressure.initialRate</code></td>
|
||||
<td>not set</td>
|
||||
<td>
|
||||
This is the initial maximum receiving rate at which each receiver will receive data for the
|
||||
first batch when the backpressure mechanism is enabled.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.streaming.blockInterval</code></td>
|
||||
<td>200ms</td>
|
||||
|
|
|
@ -36,7 +36,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
|
|||
|
||||
// treated as an upper limit
|
||||
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
|
||||
private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble)
|
||||
private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
|
||||
|
||||
def waitToPush() {
|
||||
rateLimiter.acquire()
|
||||
|
@ -61,4 +61,11 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
|
|||
rateLimiter.setRate(newRate)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the initial rateLimit to initial rateLimiter
|
||||
*/
|
||||
private def getInitialRateLimit(): Long = {
|
||||
math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue