[SPARK-9202] capping maximum number of executor&driver information kept in Worker

https://issues.apache.org/jira/browse/SPARK-9202

Author: CodingCat <zhunansjtu@gmail.com>

Closes #7714 from CodingCat/SPARK-9202 and squashes the following commits:

23977fb [CodingCat] add comments about why we don't synchronize finishedExecutors & finishedDrivers
dc9772d [CodingCat] addressing the comments
e125241 [CodingCat] stylistic fix
80bfe52 [CodingCat] fix JsonProtocolSuite
d7d9485 [CodingCat] styistic fix and respect insert ordering
031755f [CodingCat] add license info & stylistic fix
c3b5361 [CodingCat] test cases and docs
c557b3a [CodingCat] applications are fine
9cac751 [CodingCat] application is fine...
ad87ed7 [CodingCat] trimFinishedExecutorsAndDrivers
This commit is contained in:
CodingCat 2015-07-31 20:27:00 +01:00 committed by Sean Owen
parent a8340fa7df
commit c0686668ae
6 changed files with 329 additions and 94 deletions

View file

@ -25,7 +25,7 @@ import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
import scala.util.Random
import scala.util.control.NonFatal
@ -115,13 +115,18 @@ private[worker] class Worker(
}
var workDir: File = null
val finishedExecutors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
val executors = new HashMap[String, ExecutorRunner]
val finishedDrivers = new HashMap[String, DriverRunner]
val finishedDrivers = new LinkedHashMap[String, DriverRunner]
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]
val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
// The shuffle service is not actually started unless configured.
private val shuffleService = new ExternalShuffleService(conf, securityMgr)
@ -461,25 +466,7 @@ private[worker] class Worker(
}
case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
sendToMaster(executorStateChanged)
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
executors -= fullId
finishedExecutors(fullId) = executor
coresUsed -= executor.cores
memoryUsed -= executor.memory
case None =>
logInfo("Unknown Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
}
maybeCleanupApplication(appId)
}
handleExecutorStateChanged(executorStateChanged)
case KillExecutor(masterUrl, appId, execId) =>
if (masterUrl != activeMasterUrl) {
@ -523,24 +510,8 @@ private[worker] class Worker(
}
}
case driverStageChanged @ DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
sendToMaster(driverStageChanged)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
handleDriverStateChanged(driverStateChanged)
}
case ReregisterWithMaster =>
@ -614,6 +585,78 @@ private[worker] class Worker(
webUi.stop()
metricsSystem.stop()
}
private def trimFinishedExecutorsIfNecessary(): Unit = {
// do not need to protect with locks since both WorkerPage and Restful server get data through
// thread-safe RpcEndPoint
if (finishedExecutors.size > retainedExecutors) {
finishedExecutors.take(math.max(finishedExecutors.size / 10, 1)).foreach {
case (executorId, _) => finishedExecutors.remove(executorId)
}
}
}
private def trimFinishedDriversIfNecessary(): Unit = {
// do not need to protect with locks since both WorkerPage and Restful server get data through
// thread-safe RpcEndPoint
if (finishedDrivers.size > retainedDrivers) {
finishedDrivers.take(math.max(finishedDrivers.size / 10, 1)).foreach {
case (driverId, _) => finishedDrivers.remove(driverId)
}
}
}
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
val state = driverStateChanged.state
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
sendToMaster(driverStateChanged)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
trimFinishedDriversIfNecessary()
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
}
private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
Unit = {
sendToMaster(executorStateChanged)
val state = executorStateChanged.state
if (ExecutorState.isFinished(state)) {
val appId = executorStateChanged.appId
val fullId = appId + "/" + executorStateChanged.execId
val message = executorStateChanged.message
val exitStatus = executorStateChanged.exitStatus
executors.get(fullId) match {
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
executors -= fullId
finishedExecutors(fullId) = executor
trimFinishedExecutorsIfNecessary()
coresUsed -= executor.cores
memoryUsed -= executor.memory
case None =>
logInfo("Unknown Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
}
maybeCleanupApplication(appId)
}
}
}
private[deploy] object Worker extends Logging {
@ -669,5 +712,4 @@ private[deploy] object Worker extends Logging {
cmd
}
}
}

View file

@ -53,6 +53,8 @@ class WorkerWebUI(
}
}
private[ui] object WorkerWebUI {
private[worker] object WorkerWebUI {
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
val DEFAULT_RETAINED_DRIVERS = 1000
val DEFAULT_RETAINED_EXECUTORS = 1000
}

View file

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import java.io.File
import java.util.Date
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.{SecurityManager, SparkConf}
private[deploy] object DeployTestUtils {
def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
}
def createAppInfo() : ApplicationInfo = {
val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
"id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
appInfo.endTime = JsonConstants.currTimeInMillis
appInfo
}
def createDriverCommand(): Command = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)
def createDriverDesc(): DriverDescription =
new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())
def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
createDriverDesc(), new Date())
def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}
def createExecutorRunner(execId: Int): ExecutorRunner = {
new ExecutorRunner(
"appId",
execId,
createAppDesc(),
4,
1234,
null,
"workerId",
"host",
123,
"publicAddress",
new File("sparkHome"),
new File("workDir"),
"akka://worker",
new SparkConf,
Seq("localDir"),
ExecutorState.RUNNING)
}
def createDriverRunner(driverId: String): DriverRunner = {
val conf = new SparkConf()
new DriverRunner(
conf,
driverId,
new File("workDir"),
new File("sparkHome"),
createDriverDesc(),
null,
"akka://worker",
new SecurityManager(conf))
}
}

View file

@ -17,7 +17,6 @@
package org.apache.spark.deploy
import java.io.File
import java.util.Date
import com.fasterxml.jackson.core.JsonParseException
@ -25,12 +24,14 @@ import org.json4s._
import org.json4s.jackson.JsonMethods
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState}
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.{JsonTestUtils, SparkFunSuite}
class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
import org.apache.spark.deploy.DeployTestUtils._
test("writeApplicationInfo") {
val output = JsonProtocol.writeApplicationInfo(createAppInfo())
assertValidJson(output)
@ -50,7 +51,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
}
test("writeExecutorRunner") {
val output = JsonProtocol.writeExecutorRunner(createExecutorRunner())
val output = JsonProtocol.writeExecutorRunner(createExecutorRunner(123))
assertValidJson(output)
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr))
}
@ -77,9 +78,10 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
test("writeWorkerState") {
val executors = List[ExecutorRunner]()
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner())
val drivers = List(createDriverRunner())
val finishedDrivers = List(createDriverRunner(), createDriverRunner())
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(123),
createExecutorRunner(123))
val drivers = List(createDriverRunner("driverId"))
val finishedDrivers = List(createDriverRunner("driverId"), createDriverRunner("driverId"))
val stateResponse = new WorkerStateResponse("host", 8080, "workerId", executors,
finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
val output = JsonProtocol.writeWorkerState(stateResponse)
@ -87,47 +89,6 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr))
}
def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
}
def createAppInfo() : ApplicationInfo = {
val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
"id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
appInfo.endTime = JsonConstants.currTimeInMillis
appInfo
}
def createDriverCommand(): Command = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)
def createDriverDesc(): DriverDescription =
new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())
def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
createDriverDesc(), new Date())
def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
val conf = new SparkConf()
new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
createDriverDesc(), null, "akka://worker", new SecurityManager(conf))
}
def assertValidJson(json: JValue) {
try {
JsonMethods.parse(JsonMethods.compact(json))

View file

@ -17,13 +17,18 @@
package org.apache.spark.deploy.worker
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.Command
import org.scalatest.Matchers
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.{Command, ExecutorState}
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
class WorkerSuite extends SparkFunSuite with Matchers {
import org.apache.spark.deploy.DeployTestUtils._
def cmd(javaOpts: String*): Command = {
Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts : _*))
}
@ -56,4 +61,126 @@ class WorkerSuite extends SparkFunSuite with Matchers {
"-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=y", "-Dspark.ssl.opt2=z")
}
test("test clearing of finishedExecutors (small number of executors)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedExecutors", 2.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
// initialize workers
for (i <- 0 until 5) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
}
// initialize ExecutorStateChanged Message
worker.handleExecutorStateChanged(
ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
assert(worker.finishedExecutors.size === 1)
assert(worker.executors.size === 4)
for (i <- 1 until 5) {
worker.handleExecutorStateChanged(
ExecutorStateChanged("app1", i, ExecutorState.EXITED, None, None))
assert(worker.finishedExecutors.size === 2)
if (i > 1) {
assert(!worker.finishedExecutors.contains(s"app1/${i - 2}"))
}
assert(worker.executors.size === 4 - i)
}
}
test("test clearing of finishedExecutors (more executors)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedExecutors", 30.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
// initialize workers
for (i <- 0 until 50) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
}
// initialize ExecutorStateChanged Message
worker.handleExecutorStateChanged(
ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
assert(worker.finishedExecutors.size === 1)
assert(worker.executors.size === 49)
for (i <- 1 until 50) {
val expectedValue = {
if (worker.finishedExecutors.size < 30) {
worker.finishedExecutors.size + 1
} else {
28
}
}
worker.handleExecutorStateChanged(
ExecutorStateChanged("app1", i, ExecutorState.EXITED, None, None))
if (expectedValue == 28) {
for (j <- i - 30 until i - 27) {
assert(!worker.finishedExecutors.contains(s"app1/$j"))
}
}
assert(worker.executors.size === 49 - i)
assert(worker.finishedExecutors.size === expectedValue)
}
}
test("test clearing of finishedDrivers (small number of drivers)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedDrivers", 2.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
// initialize workers
for (i <- 0 until 5) {
val driverId = s"driverId-$i"
worker.drivers += driverId -> createDriverRunner(driverId)
}
// initialize DriverStateChanged Message
worker.handleDriverStateChanged(DriverStateChanged("driverId-0", DriverState.FINISHED, None))
assert(worker.drivers.size === 4)
assert(worker.finishedDrivers.size === 1)
for (i <- 1 until 5) {
val driverId = s"driverId-$i"
worker.handleDriverStateChanged(DriverStateChanged(driverId, DriverState.FINISHED, None))
if (i > 1) {
assert(!worker.finishedDrivers.contains(s"driverId-${i - 2}"))
}
assert(worker.drivers.size === 4 - i)
assert(worker.finishedDrivers.size === 2)
}
}
test("test clearing of finishedDrivers (more drivers)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedDrivers", 30.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
// initialize workers
for (i <- 0 until 50) {
val driverId = s"driverId-$i"
worker.drivers += driverId -> createDriverRunner(driverId)
}
// initialize DriverStateChanged Message
worker.handleDriverStateChanged(DriverStateChanged("driverId-0", DriverState.FINISHED, None))
assert(worker.finishedDrivers.size === 1)
assert(worker.drivers.size === 49)
for (i <- 1 until 50) {
val expectedValue = {
if (worker.finishedDrivers.size < 30) {
worker.finishedDrivers.size + 1
} else {
28
}
}
val driverId = s"driverId-$i"
worker.handleDriverStateChanged(DriverStateChanged(driverId, DriverState.FINISHED, None))
if (expectedValue == 28) {
for (j <- i - 30 until i - 27) {
assert(!worker.finishedDrivers.contains(s"driverId-$j"))
}
}
assert(worker.drivers.size === 49 - i)
assert(worker.finishedDrivers.size === expectedValue)
}
}
}

View file

@ -557,6 +557,20 @@ Apart from these, the following properties are also available, and may be useful
collecting.
</td>
</tr>
<tr>
<td><code>spark.worker.ui.retainedExecutors</code></td>
<td>1000</td>
<td>
How many finished executors the Spark UI and status APIs remember before garbage collecting.
</td>
</tr>
<tr>
<td><code>spark.worker.ui.retainedDrivers</code></td>
<td>1000</td>
<td>
How many finished drivers the Spark UI and status APIs remember before garbage collecting.
</td>
</tr>
</table>
#### Compression and Serialization