[SPARK-29400][CORE] Improve PrometheusResource to use labels

### What changes were proposed in this pull request?

[SPARK-29064](https://github.com/apache/spark/pull/25770) introduced `PrometheusResource` to expose `ExecutorSummary`. This PR aims to improve it further more `Prometheus`-friendly to use [Prometheus labels](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).

### Why are the changes needed?

**BEFORE**
```
metrics_app_20191008151432_0000_driver_executor_rddBlocks_Count 0
metrics_app_20191008151432_0000_driver_executor_memoryUsed_Count 0
metrics_app_20191008151432_0000_driver_executor_diskUsed_Count 0
```

**AFTER**
```
$ curl -s http://localhost:4040/metrics/executors/prometheus/ | head -n3
metrics_executor_rddBlocks_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
metrics_executor_memoryUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
metrics_executor_diskUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
```

### Does this PR introduce any user-facing change?

No, but `Prometheus` understands the new format and shows more intelligently.

<img width="735" alt="ui" src="https://user-images.githubusercontent.com/9700541/66438279-1756f900-e9e1-11e9-91c7-c04c6ce9172f.png">

### How was this patch tested?

Manually.

**SETUP**
```
$ sbin/start-master.sh
$ sbin/start-slave.sh spark://`hostname`:7077
$ bin/spark-shell --master spark://`hostname`:7077 --conf spark.ui.prometheus.enabled=true
```

**RESULT**
```
$ curl -s http://localhost:4040/metrics/executors/prometheus/ | head -n3
metrics_executor_rddBlocks_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
metrics_executor_memoryUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
metrics_executor_diskUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
```

Closes #26060 from dongjoon-hyun/SPARK-29400.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2019-10-10 08:47:12 -07:00
parent 6edabeb0ee
commit e946104c42

View file

@ -40,30 +40,35 @@ private[v1] class PrometheusResource extends ApiRequestContext {
def executors(): String = {
val sb = new StringBuilder
val store = uiRoot.asInstanceOf[SparkUI].store
val appId = store.applicationInfo.id.replaceAll("[^a-zA-Z0-9]", "_")
store.executorList(true).foreach { executor =>
val prefix = s"metrics_${appId}_${executor.id}_executor_"
sb.append(s"${prefix}rddBlocks_Count ${executor.rddBlocks}\n")
sb.append(s"${prefix}memoryUsed_Count ${executor.memoryUsed}\n")
sb.append(s"${prefix}diskUsed_Count ${executor.diskUsed}\n")
sb.append(s"${prefix}totalCores_Count ${executor.totalCores}\n")
sb.append(s"${prefix}maxTasks_Count ${executor.maxTasks}\n")
sb.append(s"${prefix}activeTasks_Count ${executor.activeTasks}\n")
sb.append(s"${prefix}failedTasks_Count ${executor.failedTasks}\n")
sb.append(s"${prefix}completedTasks_Count ${executor.completedTasks}\n")
sb.append(s"${prefix}totalTasks_Count ${executor.totalTasks}\n")
sb.append(s"${prefix}totalDuration_Value ${executor.totalDuration}\n")
sb.append(s"${prefix}totalGCTime_Value ${executor.totalGCTime}\n")
sb.append(s"${prefix}totalInputBytes_Count ${executor.totalInputBytes}\n")
sb.append(s"${prefix}totalShuffleRead_Count ${executor.totalShuffleRead}\n")
sb.append(s"${prefix}totalShuffleWrite_Count ${executor.totalShuffleWrite}\n")
sb.append(s"${prefix}maxMemory_Count ${executor.maxMemory}\n")
val prefix = "metrics_executor_"
val labels = Seq(
"application_id" -> store.applicationInfo.id,
"application_name" -> store.applicationInfo.name,
"executor_id" -> executor.id
).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}")
sb.append(s"${prefix}rddBlocks_Count$labels ${executor.rddBlocks}\n")
sb.append(s"${prefix}memoryUsed_Count$labels ${executor.memoryUsed}\n")
sb.append(s"${prefix}diskUsed_Count$labels ${executor.diskUsed}\n")
sb.append(s"${prefix}totalCores_Count$labels ${executor.totalCores}\n")
sb.append(s"${prefix}maxTasks_Count$labels ${executor.maxTasks}\n")
sb.append(s"${prefix}activeTasks_Count$labels ${executor.activeTasks}\n")
sb.append(s"${prefix}failedTasks_Count$labels ${executor.failedTasks}\n")
sb.append(s"${prefix}completedTasks_Count$labels ${executor.completedTasks}\n")
sb.append(s"${prefix}totalTasks_Count$labels ${executor.totalTasks}\n")
sb.append(s"${prefix}totalDuration_Value$labels ${executor.totalDuration}\n")
sb.append(s"${prefix}totalGCTime_Value$labels ${executor.totalGCTime}\n")
sb.append(s"${prefix}totalInputBytes_Count$labels ${executor.totalInputBytes}\n")
sb.append(s"${prefix}totalShuffleRead_Count$labels ${executor.totalShuffleRead}\n")
sb.append(s"${prefix}totalShuffleWrite_Count$labels ${executor.totalShuffleWrite}\n")
sb.append(s"${prefix}maxMemory_Count$labels ${executor.maxMemory}\n")
executor.executorLogs.foreach { case (k, v) => }
executor.memoryMetrics.foreach { m =>
sb.append(s"${prefix}usedOnHeapStorageMemory_Count ${m.usedOnHeapStorageMemory}\n")
sb.append(s"${prefix}usedOffHeapStorageMemory_Count ${m.usedOffHeapStorageMemory}\n")
sb.append(s"${prefix}totalOnHeapStorageMemory_Count ${m.totalOnHeapStorageMemory}\n")
sb.append(s"${prefix}totalOffHeapStorageMemory_Count ${m.totalOffHeapStorageMemory}\n")
sb.append(s"${prefix}usedOnHeapStorageMemory_Count$labels ${m.usedOnHeapStorageMemory}\n")
sb.append(s"${prefix}usedOffHeapStorageMemory_Count$labels ${m.usedOffHeapStorageMemory}\n")
sb.append(s"${prefix}totalOnHeapStorageMemory_Count$labels ${m.totalOnHeapStorageMemory}\n")
sb.append(s"${prefix}totalOffHeapStorageMemory_Count$labels " +
s"${m.totalOffHeapStorageMemory}\n")
}
executor.peakMemoryMetrics.foreach { m =>
val names = Array(
@ -89,7 +94,7 @@ private[v1] class PrometheusResource extends ApiRequestContext {
"MajorGCTime"
)
names.foreach { name =>
sb.append(s"$prefix${name}_Count ${m.getMetricValue(name)}\n")
sb.append(s"$prefix${name}_Count$labels ${m.getMetricValue(name)}\n")
}
}
}