create metrics name manually.
This commit is contained in:
parent
188abbf8f1
commit
22bed59d2d
|
@ -34,7 +34,7 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
|
|||
override def getValue: Long = application.duration
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("cores_number"), new Gauge[Int] {
|
||||
override def getValue: Int = application.coresGranted
|
||||
})
|
||||
|
||||
|
|
|
@ -26,17 +26,17 @@ private[spark] class MasterSource(val master: Master) extends Source {
|
|||
val sourceName = "master"
|
||||
|
||||
// Gauge for worker numbers in cluster
|
||||
metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("workers_number"), new Gauge[Int] {
|
||||
override def getValue: Int = master.workers.size
|
||||
})
|
||||
|
||||
// Gauge for application numbers in cluster
|
||||
metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("apps_number"), new Gauge[Int] {
|
||||
override def getValue: Int = master.apps.size
|
||||
})
|
||||
|
||||
// Gauge for waiting application numbers in cluster
|
||||
metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("waitingApps_number"), new Gauge[Int] {
|
||||
override def getValue: Int = master.waitingApps.size
|
||||
})
|
||||
}
|
||||
|
|
|
@ -25,27 +25,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
|
|||
val sourceName = "worker"
|
||||
val metricRegistry = new MetricRegistry()
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("executors_number"), new Gauge[Int] {
|
||||
override def getValue: Int = worker.executors.size
|
||||
})
|
||||
|
||||
// Gauge for cores used of this worker
|
||||
metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("coresUsed_number"), new Gauge[Int] {
|
||||
override def getValue: Int = worker.coresUsed
|
||||
})
|
||||
|
||||
// Gauge for memory used of this worker
|
||||
metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("memUsed_MBytes"), new Gauge[Int] {
|
||||
override def getValue: Int = worker.memoryUsed
|
||||
})
|
||||
|
||||
// Gauge for cores free of this worker
|
||||
metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("coresFree_number"), new Gauge[Int] {
|
||||
override def getValue: Int = worker.coresFree
|
||||
})
|
||||
|
||||
// Gauge for memory free of this worker
|
||||
metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("memFree_MBytes"), new Gauge[Int] {
|
||||
override def getValue: Int = worker.memoryFree
|
||||
})
|
||||
}
|
||||
|
|
|
@ -43,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
|
|||
val sourceName = "executor.%s".format(executorId)
|
||||
|
||||
// Gauge for executor thread pool's actively executing task counts
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask_count"), new Gauge[Int] {
|
||||
override def getValue: Int = executor.threadPool.getActiveCount()
|
||||
})
|
||||
|
||||
// Gauge for executor thread pool's approximate total number of tasks that have been completed
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", "completeTask_count"), new Gauge[Long] {
|
||||
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
|
||||
})
|
||||
|
||||
// Gauge for executor thread pool's current number of threads
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
|
||||
override def getValue: Int = executor.threadPool.getPoolSize()
|
||||
})
|
||||
|
||||
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
|
||||
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
|
||||
})
|
||||
|
||||
// Gauge for file system stats of this executor
|
||||
for (scheme <- Array("hdfs", "file")) {
|
||||
registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
|
||||
registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
|
||||
registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
|
||||
registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
|
||||
registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
|
||||
registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
|
||||
registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
|
||||
registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
|
||||
registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
|
||||
registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,23 +27,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar
|
|||
val metricRegistry = new MetricRegistry()
|
||||
val sourceName = "%s.DAGScheduler".format(sc.appName)
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("stage", "failedStages_number"), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.failed.size
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("stage", "runningStages_number"), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.running.size
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("stage", "waitingStages_number"), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.waiting.size
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("job", "allJobs_number"), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.nextJobId.get()
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
|
||||
metricRegistry.register(MetricRegistry.name("job", "activeJobs_number"), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.activeJobs.size
|
||||
})
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
|
|||
val metricRegistry = new MetricRegistry()
|
||||
val sourceName = "%s.BlockManager".format(sc.appName)
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
|
||||
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MBytes"), new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
|
||||
|
@ -36,7 +36,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
|
|||
}
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
|
||||
metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MBytes"), new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
|
||||
|
@ -44,7 +44,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
|
|||
}
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
|
||||
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MBytes"), new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
|
||||
|
@ -53,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
|
|||
}
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
|
||||
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MBytes"), new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val diskSpaceUsed = storageStatusList
|
||||
|
|
Loading…
Reference in a new issue