Fine tuning defaults for akka and restored tracking of dissassociated events, for they are delivered when a remote TCP socket is closed. Also made transport failure heartbeats larger interval for it is mostly not needed. As we are using remote death watch instead.

This commit is contained in:
Prashant Sharma 2013-11-22 19:46:39 +05:30
parent 95d8dbce91
commit 77929cfeed
5 changed files with 40 additions and 21 deletions

View file

@ -17,8 +17,9 @@
package org.apache.spark.deploy.master
import java.util.Date
import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
@ -28,6 +29,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
import akka.pattern.ask
import akka.remote._
import akka.serialization.SerializationExtension
import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
@ -40,11 +42,6 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
import org.apache.spark.deploy.DeployMessages.KillExecutor
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import scala.Some
import akka.actor.Terminated
import akka.serialization.SerializationExtension
import java.util.concurrent.TimeUnit
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@ -102,6 +99,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
@ -267,11 +265,20 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case Terminated(actor) => {
// The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap
logInfo(s"$actor got terminated, removing it.")
actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RequestMasterState => {
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
state)
@ -431,6 +438,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
context.stop(worker.actor)
context.unwatch(worker.actor)
persistenceEngine.removeWorker(worker)
}
@ -493,6 +502,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
app.driver ! ApplicationRemoved(state.toString)
}
persistenceEngine.removeApplication(app)
context.stop(app.driver)
context.unwatch(app.driver)
schedule()
}
}

View file

@ -17,14 +17,16 @@
package org.apache.spark.deploy.worker
import java.io.File
import java.text.SimpleDateFormat
import java.util.Date
import java.io.File
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import akka.actor._
import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.Logging
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
@ -36,10 +38,8 @@ import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
import org.apache.spark.deploy.DeployMessages.KillExecutor
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import scala.Some
import org.apache.spark.deploy.DeployMessages.Heartbeat
import org.apache.spark.deploy.DeployMessages.RegisteredWorker
import akka.remote.DisassociatedEvent
import org.apache.spark.deploy.DeployMessages.LaunchExecutor
import org.apache.spark.deploy.DeployMessages.RegisterWorker
@ -124,7 +124,7 @@ private[spark] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
registerWithMaster()
@ -249,6 +249,10 @@ private[spark] class Worker(
logInfo(s"$actor_ terminated !")
masterDisconnected()
case x: DisassociatedEvent =>
logInfo(s"$x Disassociated !")
masterDisconnected()
case RequestWorkerState => {
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
finishedExecutors.values.toList, activeMasterUrl, cores, memory,

View file

@ -26,11 +26,6 @@ import org.apache.spark.Logging
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
import akka.remote.DisassociatedEvent
import akka.remote.AssociationErrorEvent
import akka.remote.DisassociatedEvent
import akka.actor.Terminated
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
@ -82,7 +77,11 @@ private[spark] class CoarseGrainedExecutorBackend(
}
case Terminated(actor) =>
logError(s"Driver $actor terminated or disconnected! Shutting down.")
logError(s"Driver $actor terminated, Shutting down.")
System.exit(1)
case x: DisassociatedEvent =>
logError(s"Driver $x disassociated! Shutting down.")
System.exit(1)
case StopExecutor =>

View file

@ -121,6 +121,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
}
// Make fake resource offers on all executors

View file

@ -44,9 +44,11 @@ private[spark] object AkkaUtils {
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "30").toInt
val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "30").toInt
val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt
val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
// Since we have our own Heart Beat mechanism and TCP already tracks connections.
// Using this makes very little sense. So setting this to a relatively larger value suffices.
val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
val akkaConf = ConfigFactory.parseString(
s"""
@ -56,8 +58,8 @@ private[spark] object AkkaUtils {
|akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
|akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
|akka.remote.watch-failure-detector.threshold = $akkaFailureDetector
|akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
|akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
|akka.remote.transport-failure-detector.heartbeat-interval = 30 s
|akka.remote.transport-failure-detector.acceptable-heartbeat-pause = ${akkaHeartBeatPauses + 10} s
|akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"