A few clean up for yarn 2.0 code

This commit is contained in:
Raymond Liu 2013-12-17 13:19:14 +08:00
parent 7815a3ace9
commit dd6d347f4f
2 changed files with 7 additions and 8 deletions

View file

@ -39,28 +39,27 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
def this(args: ApplicationMasterArguments) = this(args, new Configuration()) def this(args: ApplicationMasterArguments) = this(args, new Configuration())
private val rpc: YarnRPC = YarnRPC.create(conf) private val rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = null private var resourceManager: AMRMProtocol = _
private var appAttemptId: ApplicationAttemptId = null private var appAttemptId: ApplicationAttemptId = _
private var reporterThread: Thread = null private var reporterThread: Thread = _
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false private var driverClosed:Boolean = false
private val sparkConf = new SparkConf private val sparkConf = new SparkConf
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf)._1 conf = sparkConf)._1
var actor: ActorRef = null var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor. // This actor just working as a monitor to watch on Driver Actor.
class MonitorActor(driverUrl: String) extends Actor { class MonitorActor(driverUrl: String) extends Actor {
var driver: ActorSelection = null var driver: ActorSelection = _
override def preStart() { override def preStart() {
logInfo("Listen to driver: " + driverUrl) logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl) driver = context.actorSelection(driverUrl)
driver ! "hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
} }

View file

@ -51,7 +51,7 @@ class WorkerRunnable(
extends Runnable with Logging { extends Runnable with Logging {
var rpc: YarnRPC = YarnRPC.create(conf) var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = null var cm: ContainerManager = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = { def run = {