[SPARK-1510] Spark Streaming metrics source for metrics system

This pulls in changes made by @jerryshao in https://github.com/apache/spark/pull/424 and merges with the master.

Author: jerryshao <saisai.shao@intel.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #545 from tdas/streaming-metrics and squashes the following commits:

034b443 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-metrics
fb3b0a5 [jerryshao] Modify according master update
21939f5 [jerryshao] Style changes according to style check error
976116b [jerryshao] Add StreamSource in StreamingContext for better monitoring through metrics system
This commit is contained in:
jerryshao 2014-04-24 18:56:57 -07:00 committed by Tathagata Das
parent 44da5ab2de
commit 80429f3e2a
3 changed files with 79 additions and 1 deletions

View file

@ -154,6 +154,10 @@ class StreamingContext private[streaming] (
private[streaming] val uiTab = new StreamingTab(this)
/** Register streaming source to metrics system */
private val streamingSource = new StreamingSource(this)
SparkEnv.get.metricsSystem.registerSource(streamingSource)
/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
type CheckpointState = Value

View file

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
import org.apache.spark.streaming.ui.StreamingJobProgressListener
private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
val metricRegistry = new MetricRegistry
val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
val streamingListener = ssc.uiTab.listener
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
defaultValue: T) {
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
})
}
// Gauge for number of network receivers
registerGauge("receivers", _.numReceivers, 0)
// Gauge for number of total completed batches
registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
// Gauge for number of unprocessed batches
registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
// Gauge for number of waiting batches
registerGauge("waitingBatches", _.waitingBatches.size, 0L)
// Gauge for number of running batches
registerGauge("runningBatches", _.runningBatches.size, 0L)
// Gauge for number of retained completed batches
registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0L)
// Gauge for last completed batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastCompletedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
registerGauge("lastCompletedBatch_processStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
registerGauge("lastCompletedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
// Gauge for last received batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastReceivedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
registerGauge("lastReceivedBatch_processStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
registerGauge("lastReceivedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
}

View file

@ -28,7 +28,8 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener {
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
extends StreamingListener {
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]