[SPARK-9602] remove "Akka/Actor" words from comments

https://issues.apache.org/jira/browse/SPARK-9602

Although we have hidden Akka behind RPC interface, I found that the Akka/Actor-related comments are still spreading everywhere. To make it consistent, we shall remove "actor"/"akka" words from the comments...

Author: CodingCat <zhunansjtu@gmail.com>

Closes #7936 from CodingCat/SPARK-9602 and squashes the following commits:

e8296a3 [CodingCat] remove actor words from comments
This commit is contained in:
CodingCat 2015-08-04 14:54:11 -07:00 committed by Reynold Xin
parent ab8ee1a3b9
commit 9d668b7368
16 changed files with 27 additions and 31 deletions

View file

@ -794,7 +794,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
/**
* We try to reuse a single Socket to transfer accumulator updates, as they are all added
* by the DAGScheduler's single-threaded actor anyway.
* by the DAGScheduler's single-threaded RpcEndpoint anyway.
*/
@transient var socket: Socket = _

View file

@ -73,12 +73,8 @@ class LocalSparkCluster(
def stop() {
logInfo("Shutting down local Spark cluster.")
// Stop the workers before the master so they don't get upset that it disconnected
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
// This is unfortunate, but for now we just comment it out.
workerRpcEnvs.foreach(_.shutdown())
// workerActorSystems.foreach(_.awaitTermination())
masterRpcEnvs.foreach(_.shutdown())
// masterActorSystems.foreach(_.awaitTermination())
masterRpcEnvs.clear()
workerRpcEnvs.clear()
}

View file

@ -257,7 +257,7 @@ private[spark] class AppClient(
}
def start() {
// Just launch an actor; it will call back into the listener.
// Just launch an rpcEndpoint; it will call back into the listener.
endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
}

View file

@ -26,7 +26,7 @@ import org.apache.spark.annotation.DeveloperApi
*/
@DeveloperApi
trait LeaderElectionAgent {
val masterActor: LeaderElectable
val masterInstance: LeaderElectable
def stop() {} // to avoid noops in implementations.
}
@ -37,7 +37,7 @@ trait LeaderElectable {
}
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable)
private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable)
extends LeaderElectionAgent {
masterActor.electedLeader()
masterInstance.electedLeader()
}

View file

@ -38,5 +38,5 @@ private[master] object MasterMessages {
case object BoundPortsRequest
case class BoundPortsResponse(actorPort: Int, webUIPort: Int, restPort: Option[Int])
case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
}

View file

@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
import org.apache.spark.deploy.SparkCuratorUtil
private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
@ -73,10 +73,10 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElecta
private def updateLeadershipStatus(isLeader: Boolean) {
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
masterActor.electedLeader()
masterInstance.electedLeader()
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
masterActor.revokedLeadership()
masterInstance.revokedLeadership()
}
}

View file

@ -228,7 +228,7 @@ private[deploy] class Worker(
/**
* Re-register with the master because a network failure or a master failure has occurred.
* If the re-registration attempt threshold is exceeded, the worker exits with error.
* Note that for thread-safety this should only be called from the actor.
* Note that for thread-safety this should only be called from the rpcEndpoint.
*/
private def reregisterWithMaster(): Unit = {
Utils.tryOrExit {
@ -365,7 +365,8 @@ private[deploy] class Worker(
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker
// rpcEndpoint.
// Copy ids so that it can be used in the cleanup thread.
val appIds = executors.values.map(_.appId).toSet
val cleanupFuture = concurrent.future {
@ -684,7 +685,7 @@ private[deploy] object Worker extends Logging {
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)

View file

@ -43,7 +43,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin
private[deploy] def setTesting(testing: Boolean) = isTesting = testing
private var isTesting = false
// Lets us filter events only from the worker's actor system
// Lets filter events only from the worker's rpc system
private val expectedAddress = RpcAddress.fromURIString(workerUrl)
private def isWorker(address: RpcAddress) = expectedAddress == address
@ -62,7 +62,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (isWorker(remoteAddress)) {
// This log message will never be seen
logError(s"Lost connection to worker actor $workerUrl. Exiting.")
logError(s"Lost connection to worker rpc endpoint $workerUrl. Exiting.")
exitNonZero()
}
}

View file

@ -100,7 +100,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
val future = ask[T](message, timeout)
val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("Actor returned null")
throw new SparkException("RpcEndpoint returned null")
}
return result
} catch {

View file

@ -162,7 +162,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[spark] object OutputCommitCoordinator {
// This actor is used only for RPC
// This endpoint is used only for RPC
private[spark] class OutputCommitCoordinatorEndpoint(
override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
extends RpcEndpoint with Logging {

View file

@ -22,7 +22,7 @@ import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress}
/**
* Grouping of data for an executor used by CoarseGrainedSchedulerBackend.
*
* @param executorEndpoint The ActorRef representing this executor
* @param executorEndpoint The RpcEndpointRef representing this executor
* @param executorAddress The network address of this executor
* @param executorHost The hostname that this executor is running on
* @param freeCores The current number of cores available for work on the executor

View file

@ -22,10 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger
/**
* A util used to get a unique generation ID. This is a wrapper around Java's
* AtomicInteger. An example usage is in BlockManager, where each BlockManager
* instance would start an Akka actor and we use this utility to assign the Akka
* actors unique names.
* instance would start an RpcEndpoint and we use this utility to assign the RpcEndpoints'
* unique names.
*/
private[spark] class IdGenerator {
private var id = new AtomicInteger
private val id = new AtomicInteger
def next: Int = id.incrementAndGet
}

View file

@ -99,7 +99,7 @@ object CustomPersistenceEngine {
@volatile var lastInstance: Option[CustomPersistenceEngine] = None
}
class CustomLeaderElectionAgent(val masterActor: LeaderElectable) extends LeaderElectionAgent {
masterActor.electedLeader()
class CustomLeaderElectionAgent(val masterInstance: LeaderElectable) extends LeaderElectionAgent {
masterInstance.electedLeader()
}

View file

@ -38,12 +38,11 @@ class WorkerWatcherSuite extends SparkFunSuite {
val conf = new SparkConf()
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val targetWorkerUrl = rpcEnv.uriOf("test", RpcAddress("1.2.3.4", 1234), "Worker")
val otherAddress = "akka://test@4.3.2.1:1234/user/OtherActor"
val otherAkkaAddress = RpcAddress("4.3.2.1", 1234)
val otherRpcAddress = RpcAddress("4.3.2.1", 1234)
val workerWatcher = new WorkerWatcher(rpcEnv, targetWorkerUrl)
workerWatcher.setTesting(testing = true)
rpcEnv.setupEndpoint("worker-watcher", workerWatcher)
workerWatcher.onDisconnected(otherAkkaAddress)
workerWatcher.onDisconnected(otherRpcAddress)
assert(!workerWatcher.isShutDown)
rpcEnv.shutdown()
}

View file

@ -182,7 +182,7 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor")
"org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint")
) ++ Seq(
// SPARK-4655 - Making Stage an Abstract class broke binary compatility even though
// the stage class is defined as private[spark]

View file

@ -981,7 +981,7 @@ class SparkILoop(
// which spins off a separate thread, then print the prompt and try
// our best to look ready. The interlocking lazy vals tend to
// inter-deadlock, so we break the cycle with a single asynchronous
// message to an actor.
// message to an rpcEndpoint.
if (isAsync) {
intp initialize initializedCallback()
createAsyncListener() // listens for signal to run postInitialization