From 77929cfeed95905106f5b3891e8de1b1c312d119 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 22 Nov 2013 19:46:39 +0530 Subject: [PATCH] Fine tuning defaults for akka and restored tracking of dissassociated events, for they are delivered when a remote TCP socket is closed. Also made transport failure heartbeats larger interval for it is mostly not needed. As we are using remote death watch instead. --- .../apache/spark/deploy/master/Master.scala | 23 ++++++++++++++----- .../apache/spark/deploy/worker/Worker.scala | 12 ++++++---- .../CoarseGrainedExecutorBackend.scala | 11 ++++----- .../CoarseGrainedSchedulerBackend.scala | 3 +++ .../org/apache/spark/util/AkkaUtils.scala | 12 ++++++---- 5 files changed, 40 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a7cfc256a9..25f5927128 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -17,8 +17,9 @@ package org.apache.spark.deploy.master -import java.util.Date import java.text.SimpleDateFormat +import java.util.concurrent.TimeUnit +import java.util.Date import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await @@ -28,6 +29,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor._ import akka.pattern.ask import akka.remote._ +import akka.serialization.SerializationExtension import akka.util.Timeout import org.apache.spark.{Logging, SparkException} @@ -40,11 +42,6 @@ import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed import org.apache.spark.deploy.DeployMessages.KillExecutor import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged -import scala.Some -import akka.actor.Terminated -import akka.serialization.SerializationExtension -import java.util.concurrent.TimeUnit - private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { import context.dispatcher @@ -102,6 +99,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act override def preStart() { logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.start() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) @@ -267,11 +265,20 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case Terminated(actor) => { // The disconnected actor could've been either a worker or an app; remove whichever of // those we have an entry for in the corresponding actor hashmap + logInfo(s"$actor got terminated, removing it.") actorToWorker.get(actor).foreach(removeWorker) actorToApp.get(actor).foreach(finishApplication) if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } + case DisassociatedEvent(_, address, _) => { + // The disconnected client could've been either a worker or an app; remove whichever it was + logInfo(s"$address got disassociated, removing it.") + addressToWorker.get(address).foreach(removeWorker) + addressToApp.get(address).foreach(finishApplication) + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } + } + case RequestMasterState => { sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, state) @@ -431,6 +438,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act exec.id, ExecutorState.LOST, Some("worker lost"), None) exec.application.removeExecutor(exec) } + context.stop(worker.actor) + context.unwatch(worker.actor) persistenceEngine.removeWorker(worker) } @@ -493,6 +502,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act app.driver ! ApplicationRemoved(state.toString) } persistenceEngine.removeApplication(app) + context.stop(app.driver) + context.unwatch(app.driver) schedule() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 9472c9a619..3a7d0b859b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -17,14 +17,16 @@ package org.apache.spark.deploy.worker +import java.io.File import java.text.SimpleDateFormat import java.util.Date -import java.io.File import scala.collection.mutable.HashMap import scala.concurrent.duration._ import akka.actor._ +import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent} + import org.apache.spark.Logging import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ @@ -36,10 +38,8 @@ import org.apache.spark.deploy.DeployMessages.WorkerStateResponse import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed import org.apache.spark.deploy.DeployMessages.KillExecutor import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged -import scala.Some import org.apache.spark.deploy.DeployMessages.Heartbeat import org.apache.spark.deploy.DeployMessages.RegisteredWorker -import akka.remote.DisassociatedEvent import org.apache.spark.deploy.DeployMessages.LaunchExecutor import org.apache.spark.deploy.DeployMessages.RegisterWorker @@ -124,7 +124,7 @@ private[spark] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) - + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.start() registerWithMaster() @@ -249,6 +249,10 @@ private[spark] class Worker( logInfo(s"$actor_ terminated !") masterDisconnected() + case x: DisassociatedEvent => + logInfo(s"$x Disassociated !") + masterDisconnected() + case RequestWorkerState => { sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, finishedExecutors.values.toList, activeMasterUrl, cores, memory, diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index a98ec06be9..2818a775d0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -26,11 +26,6 @@ import org.apache.spark.Logging import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{Utils, AkkaUtils} -import akka.remote.DisassociatedEvent -import akka.remote.AssociationErrorEvent -import akka.remote.DisassociatedEvent -import akka.actor.Terminated - private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, @@ -82,7 +77,11 @@ private[spark] class CoarseGrainedExecutorBackend( } case Terminated(actor) => - logError(s"Driver $actor terminated or disconnected! Shutting down.") + logError(s"Driver $actor terminated, Shutting down.") + System.exit(1) + + case x: DisassociatedEvent => + logError(s"Driver $x disassociated! Shutting down.") System.exit(1) case StopExecutor => diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 821c30a119..e316f6b41f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -121,6 +121,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac case Terminated(actor) => actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated")) + case DisassociatedEvent(_, address, _) => + addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) + } // Make fake resource offers on all executors diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 2a831382df..90a5387b2b 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -44,9 +44,11 @@ private[spark] object AkkaUtils { val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" - val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "30").toInt - val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "30").toInt - val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt + val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt + val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble + // Since we have our own Heart Beat mechanism and TCP already tracks connections. + // Using this makes very little sense. So setting this to a relatively larger value suffices. + val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt val akkaConf = ConfigFactory.parseString( s""" @@ -56,8 +58,8 @@ private[spark] object AkkaUtils { |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s |akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s |akka.remote.watch-failure-detector.threshold = $akkaFailureDetector - |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s - |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s + |akka.remote.transport-failure-detector.heartbeat-interval = 30 s + |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = ${akkaHeartBeatPauses + 10} s |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"