Merge pull request #247 from aarondav/minor

Increase spark.akka.askTimeout default to 30 seconds

In experimental clusters we've observed that a 10 second timeout was insufficient, despite having a low number of nodes and relatively small workload (16 nodes, <1.5 TB data). This would cause an entire job to fail at the beginning of the reduce phase.
There is no particular reason for this value to be small as a timeout should only occur in an exceptional situation.

Also centralized the reading of spark.akka.askTimeout to AkkaUtils (surely this can later be cleaned up to use Typesafe).

Finally, deleted some lurking implicits. If anyone can think of a reason they should still be there, please let me know.
This commit is contained in:
Reynold Xin 2013-12-18 22:22:21 -08:00
commit bfba5323be
13 changed files with 48 additions and 71 deletions

View file

@ -29,8 +29,7 @@ import akka.pattern.ask
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
@ -53,7 +52,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
private[spark] class MapOutputTracker extends Logging {
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
private val timeout = AkkaUtils.askTimeout
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _

View file

@ -23,14 +23,14 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import akka.actor._
import akka.pattern.AskTimeoutException
import akka.pattern.ask
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.AkkaUtils
/**
@ -178,7 +178,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val timeout = AkkaUtils.askTimeout
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {

View file

@ -18,19 +18,16 @@
package org.apache.spark.deploy.master
import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
import akka.pattern.ask
import akka.remote._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@ -38,7 +35,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@ -537,12 +534,10 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
val timeoutDuration: FiniteDuration = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
implicit val timeout = Timeout(timeoutDuration)
val respFuture = actor ? RequestWebUIPort // ask pattern
val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
val timeout = AkkaUtils.askTimeout
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
}
}

View file

@ -17,32 +17,28 @@
package org.apache.spark.deploy.master.ui
import scala.concurrent.Await
import scala.xml.Node
import akka.pattern.ask
import scala.concurrent.Await
import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
implicit val timeout = parent.timeout
val timeout = parent.timeout
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
@ -53,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})

View file

@ -17,37 +17,33 @@
package org.apache.spark.deploy.master.ui
import javax.servlet.http.HttpServletRequest
import scala.concurrent.Await
import scala.xml.Node
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
import org.apache.spark.deploy.DeployWebUI
import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
implicit val timeout = parent.timeout
val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
}
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val state = Await.result(stateFuture, timeout)
val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
val workers = state.workers.sortBy(_.id)

View file

@ -17,25 +17,21 @@
package org.apache.spark.deploy.master.ui
import scala.concurrent.duration._
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.{Logging}
import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.JettyUtils
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
implicit val timeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort

View file

@ -42,13 +42,13 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
val workerState = Await.result(stateFuture, timeout)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
val runningExecutorTable =

View file

@ -19,17 +19,14 @@ package org.apache.spark.deploy.worker.ui
import java.io.File
import scala.concurrent.duration._
import akka.util.Timeout
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone worker.
@ -37,8 +34,7 @@ import org.eclipse.jetty.server.{Handler, Server}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
implicit val timeout = Timeout(
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)

View file

@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.Utils
import org.apache.spark.util.{AkkaUtils, Utils}
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@ -47,6 +47,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
private val timeout = AkkaUtils.askTimeout
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
@ -172,10 +174,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
private val timeout = {
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
}
def stopExecutors() {
try {
if (driverActor != null) {

View file

@ -18,7 +18,6 @@
package org.apache.spark.storage
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
@ -26,15 +25,17 @@ import akka.pattern.ask
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
private[spark]
class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val timeout = AkkaUtils.askTimeout
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {

View file

@ -21,17 +21,15 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.Future
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.Utils
import org.apache.spark.util.{AkkaUtils, Utils}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
@ -50,8 +48,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
val akkaTimeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
private val akkaTimeout = AkkaUtils.askTimeout
initLogging()

View file

@ -28,9 +28,6 @@ import org.apache.spark.ui.JettyUtils._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
implicit val timeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val indexPage = new IndexPage(this)
val rddPage = new RDDPage(this)

View file

@ -17,6 +17,8 @@
package org.apache.spark.util
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
@ -84,4 +86,8 @@ private[spark] object AkkaUtils {
(actorSystem, boundPort)
}
/** Returns the default Spark timeout to use for Akka ask operations. */
def askTimeout: FiniteDuration = {
Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds")
}
}