remedy the line-wrap while exceeding 100 chars
This commit is contained in:
parent
4b68be5f3c
commit
892fb8ffa8
|
@ -31,12 +31,13 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
|
|||
override def getValue: String = application.state.toString
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] {
|
||||
override def getValue: Long = application.duration
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")),
|
||||
new Gauge[Long] { override def getValue: Long = application.duration })
|
||||
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = application.coresGranted
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = application.coresGranted })
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -27,17 +27,18 @@ private[spark] class MasterSource(val master: Master) extends Source {
|
|||
val sourceName = "master"
|
||||
|
||||
// Gauge for worker numbers in cluster
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] {
|
||||
override def getValue: Int = master.workers.size
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("workers","number")),
|
||||
new Gauge[Int] { override def getValue: Int = master.workers.size })
|
||||
|
||||
// Gauge for application numbers in cluster
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = master.apps.size
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = master.apps.size })
|
||||
|
||||
// Gauge for waiting application numbers in cluster
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = master.waitingApps.size
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = master.waitingApps.size })
|
||||
}
|
||||
|
||||
|
|
|
@ -26,27 +26,28 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
|
|||
val sourceName = "worker"
|
||||
val metricRegistry = new MetricRegistry()
|
||||
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = worker.executors.size
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = worker.executors.size })
|
||||
|
||||
// Gauge for cores used of this worker
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = worker.coresUsed
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = worker.coresUsed })
|
||||
|
||||
// Gauge for memory used of this worker
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] {
|
||||
override def getValue: Int = worker.memoryUsed
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")),
|
||||
new Gauge[Int] { override def getValue: Int = worker.memoryUsed })
|
||||
|
||||
// Gauge for cores free of this worker
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = worker.coresFree
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = worker.coresFree })
|
||||
|
||||
// Gauge for memory free of this worker
|
||||
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] {
|
||||
override def getValue: Int = worker.memoryFree
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")),
|
||||
new Gauge[Int] { override def getValue: Int = worker.memoryFree })
|
||||
}
|
||||
|
||||
|
|
|
@ -44,31 +44,42 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
|
|||
val sourceName = "executor.%s".format(executorId)
|
||||
|
||||
// Gauge for executor thread pool's actively executing task counts
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] {
|
||||
override def getValue: Int = executor.threadPool.getActiveCount()
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")),
|
||||
new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() })
|
||||
|
||||
// Gauge for executor thread pool's approximate total number of tasks that have been completed
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] {
|
||||
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")),
|
||||
new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() })
|
||||
|
||||
// Gauge for executor thread pool's current number of threads
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] {
|
||||
override def getValue: Int = executor.threadPool.getPoolSize()
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")),
|
||||
new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() })
|
||||
|
||||
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
|
||||
metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] {
|
||||
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")),
|
||||
new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() })
|
||||
|
||||
// Gauge for file system stats of this executor
|
||||
for (scheme <- Array("hdfs", "file")) {
|
||||
registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L)
|
||||
registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L)
|
||||
registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0)
|
||||
registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0)
|
||||
registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0)
|
||||
registerFileSystemStat(scheme,
|
||||
NamingConventions.makeMetricName("read", "bytes"),
|
||||
_.getBytesRead(), 0L)
|
||||
registerFileSystemStat(scheme,
|
||||
NamingConventions.makeMetricName("write", "bytes"),
|
||||
_.getBytesWritten(), 0L)
|
||||
registerFileSystemStat(scheme,
|
||||
NamingConventions.makeMetricName("read", "ops"),
|
||||
_.getReadOps(), 0)
|
||||
registerFileSystemStat(scheme,
|
||||
NamingConventions.makeMetricName("largeRead", "ops"),
|
||||
_.getLargeReadOps(), 0)
|
||||
registerFileSystemStat(scheme,
|
||||
NamingConventions.makeMetricName("write", "ops"),
|
||||
_.getWriteOps(), 0)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,23 +28,24 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar
|
|||
val metricRegistry = new MetricRegistry()
|
||||
val sourceName = "%s.DAGScheduler".format(sc.appName)
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.failed.size
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size })
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.running.size
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = dagScheduler.running.size })
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.waiting.size
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size })
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.nextJobId.get()
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() })
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] {
|
||||
override def getValue: Int = dagScheduler.activeJobs.size
|
||||
})
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")),
|
||||
new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size })
|
||||
}
|
||||
|
||||
|
|
|
@ -29,40 +29,48 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
|
|||
val metricRegistry = new MetricRegistry()
|
||||
val sourceName = "%s.BlockManager".format(sc.appName)
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
|
||||
maxMem / 1024 / 1024
|
||||
}
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")),
|
||||
new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
|
||||
maxMem / 1024 / 1024
|
||||
}
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
|
||||
remainingMem / 1024 / 1024
|
||||
}
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")),
|
||||
new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
|
||||
remainingMem / 1024 / 1024
|
||||
}
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
|
||||
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
|
||||
(maxMem - remainingMem) / 1024 / 1024
|
||||
}
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")),
|
||||
new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
|
||||
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
|
||||
(maxMem - remainingMem) / 1024 / 1024
|
||||
}
|
||||
})
|
||||
|
||||
metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val diskSpaceUsed = storageStatusList
|
||||
.flatMap(_.blocks.values.map(_.diskSize))
|
||||
.reduceOption(_ + _)
|
||||
.getOrElse(0L)
|
||||
|
||||
diskSpaceUsed / 1024 / 1024
|
||||
}
|
||||
metricRegistry.register(
|
||||
MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] {
|
||||
override def getValue: Long = {
|
||||
val storageStatusList = blockManager.master.getStorageStatus
|
||||
val diskSpaceUsed = storageStatusList
|
||||
.flatMap(_.blocks.values.map(_.diskSize))
|
||||
.reduceOption(_ + _)
|
||||
.getOrElse(0L)
|
||||
|
||||
diskSpaceUsed / 1024 / 1024
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue