[SPARK-7007] [CORE] Add a metric source for ExecutorAllocationManager
Add a metric source to expose the internal status of ExecutorAllocationManager to better monitoring the resource usage of executors when dynamic allocation is enable. Please help to review, thanks a lot.
Author: jerryshao <saisai.shao@intel.com>
Closes #5589 from jerryshao/dynamic-allocation-source and squashes the following commits:
104d155 [jerryshao] rebase and address the comments
c501a2c [jerryshao] Address the comments
d237ba5 [jerryshao] Address the comments
2c3540f [jerryshao] Add a metric source for ExecutorAllocationManager
(cherry picked from commit 9f1f9b1037
)
Signed-off-by: Andrew Or <andrew@databricks.com>
This commit is contained in:
parent
acc877a989
commit
29350eef30
|
@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
|
|
||||||
|
import com.codahale.metrics.{Gauge, MetricRegistry}
|
||||||
|
|
||||||
import org.apache.spark.scheduler._
|
import org.apache.spark.scheduler._
|
||||||
|
import org.apache.spark.metrics.source.Source
|
||||||
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
|
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -144,6 +147,9 @@ private[spark] class ExecutorAllocationManager(
|
||||||
private val executor =
|
private val executor =
|
||||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
|
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
|
||||||
|
|
||||||
|
// Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
|
||||||
|
val executorAllocationManagerSource = new ExecutorAllocationManagerSource
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that the settings specified through the config are valid.
|
* Verify that the settings specified through the config are valid.
|
||||||
* If not, throw an appropriate exception.
|
* If not, throw an appropriate exception.
|
||||||
|
@ -579,6 +585,29 @@ private[spark] class ExecutorAllocationManager(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metric source for ExecutorAllocationManager to expose its internal executor allocation
|
||||||
|
* status to MetricsSystem.
|
||||||
|
* Note: These metrics heavily rely on the internal implementation of
|
||||||
|
* ExecutorAllocationManager, metrics or value of metrics will be changed when internal
|
||||||
|
* implementation is changed, so these metrics are not stable across Spark version.
|
||||||
|
*/
|
||||||
|
private[spark] class ExecutorAllocationManagerSource extends Source {
|
||||||
|
val sourceName = "ExecutorAllocationManager"
|
||||||
|
val metricRegistry = new MetricRegistry()
|
||||||
|
|
||||||
|
private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
|
||||||
|
metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
|
||||||
|
override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
|
||||||
|
registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0)
|
||||||
|
registerGauge("numberAllExecutors", executorIds.size, 0)
|
||||||
|
registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
|
||||||
|
registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private object ExecutorAllocationManager {
|
private object ExecutorAllocationManager {
|
||||||
|
|
|
@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
||||||
_taskScheduler.postStartHook()
|
_taskScheduler.postStartHook()
|
||||||
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
|
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
|
||||||
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
|
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
|
||||||
|
_executorAllocationManager.foreach { e =>
|
||||||
|
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
|
||||||
|
}
|
||||||
|
|
||||||
// Make sure the context is stopped if the user forgets about it. This avoids leaving
|
// Make sure the context is stopped if the user forgets about it. This avoids leaving
|
||||||
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
|
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
|
||||||
|
|
Loading…
Reference in a new issue