[SPARK-4688] Have a single shared network timeout in Spark

[SPARK-4688] Have a single shared network timeout in Spark

Author: Varun Saxena <vsaxena.varun@gmail.com>
Author: varunsaxena <vsaxena.varun@gmail.com>

Closes #3562 from varunsaxena/SPARK-4688 and squashes the following commits:

6e97f72 [Varun Saxena] [SPARK-4688] Single shared network timeout
cd783a2 [Varun Saxena] SPARK-4688
d6f8c29 [Varun Saxena] SCALA-4688
9562b15 [Varun Saxena] SPARK-4688
a75f014 [varunsaxena] SPARK-4688
594226c [varunsaxena] SPARK-4688
This commit is contained in:
Varun Saxena 2015-01-05 10:32:37 -08:00 committed by Reynold Xin
parent 5c506cecb9
commit d3f07fd23c
5 changed files with 21 additions and 5 deletions

View file

@ -81,7 +81,8 @@ private[nio] class ConnectionManager(
private val ackTimeoutMonitor =
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
private val ackTimeout =
conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 100))
// Get the thread counts from the Spark Configuration.
//

View file

@ -52,8 +52,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
private val akkaTimeout = AkkaUtils.askTimeout(conf)
val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000))
val slaveTimeout = {
val defaultMs = math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000)
val networkTimeout = conf.getInt("spark.network.timeout", defaultMs / 1000)
conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", networkTimeout * 1000)
}
val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)

View file

@ -65,7 +65,7 @@ private[spark] object AkkaUtils extends Logging {
val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 100))
val akkaFrameSize = maxFrameSizeBytes(conf)
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"

View file

@ -818,6 +818,16 @@ Apart from these, the following properties are also available, and may be useful
Communication timeout between Spark nodes, in seconds.
</td>
</tr>
<tr>
<td><code>spark.network.timeout</code></td>
<td>100</td>
<td>
Default timeout for all network interactions, in seconds. This config will be used in
place of <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
<code>spark.storage.blockManagerSlaveTimeoutMs</code> or <code>spark.shuffle.io.connectionTimeout</code>,
if they are not configured.
</td>
</tr>
<tr>
<td><code>spark.akka.heartbeat.pauses</code></td>
<td>6000</td>

View file

@ -37,7 +37,9 @@ public class TransportConf {
/** Connect timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000;
int timeout =
conf.getInt("spark.shuffle.io.connectionTimeout", conf.getInt("spark.network.timeout", 100));
return timeout * 1000;
}
/** Number of concurrent connections between two nodes for fetching data. */