From e946104c42a1a7e6acb63b693357703458e872d1 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 10 Oct 2019 08:47:12 -0700 Subject: [PATCH] [SPARK-29400][CORE] Improve PrometheusResource to use labels ### What changes were proposed in this pull request? [SPARK-29064](https://github.com/apache/spark/pull/25770) introduced `PrometheusResource` to expose `ExecutorSummary`. This PR aims to improve it further more `Prometheus`-friendly to use [Prometheus labels](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). ### Why are the changes needed? **BEFORE** ``` metrics_app_20191008151432_0000_driver_executor_rddBlocks_Count 0 metrics_app_20191008151432_0000_driver_executor_memoryUsed_Count 0 metrics_app_20191008151432_0000_driver_executor_diskUsed_Count 0 ``` **AFTER** ``` $ curl -s http://localhost:4040/metrics/executors/prometheus/ | head -n3 metrics_executor_rddBlocks_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0 metrics_executor_memoryUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0 metrics_executor_diskUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0 ``` ### Does this PR introduce any user-facing change? No, but `Prometheus` understands the new format and shows more intelligently. ui ### How was this patch tested? Manually. **SETUP** ``` $ sbin/start-master.sh $ sbin/start-slave.sh spark://`hostname`:7077 $ bin/spark-shell --master spark://`hostname`:7077 --conf spark.ui.prometheus.enabled=true ``` **RESULT** ``` $ curl -s http://localhost:4040/metrics/executors/prometheus/ | head -n3 metrics_executor_rddBlocks_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0 metrics_executor_memoryUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0 metrics_executor_diskUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0 ``` Closes #26060 from dongjoon-hyun/SPARK-29400. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../status/api/v1/PrometheusResource.scala | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala index 6e52e213bd..f9fb78e65a 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala @@ -40,30 +40,35 @@ private[v1] class PrometheusResource extends ApiRequestContext { def executors(): String = { val sb = new StringBuilder val store = uiRoot.asInstanceOf[SparkUI].store - val appId = store.applicationInfo.id.replaceAll("[^a-zA-Z0-9]", "_") store.executorList(true).foreach { executor => - val prefix = s"metrics_${appId}_${executor.id}_executor_" - sb.append(s"${prefix}rddBlocks_Count ${executor.rddBlocks}\n") - sb.append(s"${prefix}memoryUsed_Count ${executor.memoryUsed}\n") - sb.append(s"${prefix}diskUsed_Count ${executor.diskUsed}\n") - sb.append(s"${prefix}totalCores_Count ${executor.totalCores}\n") - sb.append(s"${prefix}maxTasks_Count ${executor.maxTasks}\n") - sb.append(s"${prefix}activeTasks_Count ${executor.activeTasks}\n") - sb.append(s"${prefix}failedTasks_Count ${executor.failedTasks}\n") - sb.append(s"${prefix}completedTasks_Count ${executor.completedTasks}\n") - sb.append(s"${prefix}totalTasks_Count ${executor.totalTasks}\n") - sb.append(s"${prefix}totalDuration_Value ${executor.totalDuration}\n") - sb.append(s"${prefix}totalGCTime_Value ${executor.totalGCTime}\n") - sb.append(s"${prefix}totalInputBytes_Count ${executor.totalInputBytes}\n") - sb.append(s"${prefix}totalShuffleRead_Count ${executor.totalShuffleRead}\n") - sb.append(s"${prefix}totalShuffleWrite_Count ${executor.totalShuffleWrite}\n") - sb.append(s"${prefix}maxMemory_Count ${executor.maxMemory}\n") + val prefix = "metrics_executor_" + val labels = Seq( + "application_id" -> store.applicationInfo.id, + "application_name" -> store.applicationInfo.name, + "executor_id" -> executor.id + ).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}") + sb.append(s"${prefix}rddBlocks_Count$labels ${executor.rddBlocks}\n") + sb.append(s"${prefix}memoryUsed_Count$labels ${executor.memoryUsed}\n") + sb.append(s"${prefix}diskUsed_Count$labels ${executor.diskUsed}\n") + sb.append(s"${prefix}totalCores_Count$labels ${executor.totalCores}\n") + sb.append(s"${prefix}maxTasks_Count$labels ${executor.maxTasks}\n") + sb.append(s"${prefix}activeTasks_Count$labels ${executor.activeTasks}\n") + sb.append(s"${prefix}failedTasks_Count$labels ${executor.failedTasks}\n") + sb.append(s"${prefix}completedTasks_Count$labels ${executor.completedTasks}\n") + sb.append(s"${prefix}totalTasks_Count$labels ${executor.totalTasks}\n") + sb.append(s"${prefix}totalDuration_Value$labels ${executor.totalDuration}\n") + sb.append(s"${prefix}totalGCTime_Value$labels ${executor.totalGCTime}\n") + sb.append(s"${prefix}totalInputBytes_Count$labels ${executor.totalInputBytes}\n") + sb.append(s"${prefix}totalShuffleRead_Count$labels ${executor.totalShuffleRead}\n") + sb.append(s"${prefix}totalShuffleWrite_Count$labels ${executor.totalShuffleWrite}\n") + sb.append(s"${prefix}maxMemory_Count$labels ${executor.maxMemory}\n") executor.executorLogs.foreach { case (k, v) => } executor.memoryMetrics.foreach { m => - sb.append(s"${prefix}usedOnHeapStorageMemory_Count ${m.usedOnHeapStorageMemory}\n") - sb.append(s"${prefix}usedOffHeapStorageMemory_Count ${m.usedOffHeapStorageMemory}\n") - sb.append(s"${prefix}totalOnHeapStorageMemory_Count ${m.totalOnHeapStorageMemory}\n") - sb.append(s"${prefix}totalOffHeapStorageMemory_Count ${m.totalOffHeapStorageMemory}\n") + sb.append(s"${prefix}usedOnHeapStorageMemory_Count$labels ${m.usedOnHeapStorageMemory}\n") + sb.append(s"${prefix}usedOffHeapStorageMemory_Count$labels ${m.usedOffHeapStorageMemory}\n") + sb.append(s"${prefix}totalOnHeapStorageMemory_Count$labels ${m.totalOnHeapStorageMemory}\n") + sb.append(s"${prefix}totalOffHeapStorageMemory_Count$labels " + + s"${m.totalOffHeapStorageMemory}\n") } executor.peakMemoryMetrics.foreach { m => val names = Array( @@ -89,7 +94,7 @@ private[v1] class PrometheusResource extends ApiRequestContext { "MajorGCTime" ) names.foreach { name => - sb.append(s"$prefix${name}_Count ${m.getMetricValue(name)}\n") + sb.append(s"$prefix${name}_Count$labels ${m.getMetricValue(name)}\n") } } }