[SPARK-22975][SS] MetricsReporter should not throw exception when there was no progress reported

## What changes were proposed in this pull request?

`MetricsReporter ` assumes that there has been some progress for the query, ie. `lastProgress` is not null. If this is not true, as it might happen in particular conditions, a `NullPointerException` can be thrown.

The PR checks whether there is a `lastProgress` and if this is not true, it returns a default value for the metrics.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20189 from mgaido91/SPARK-22975.
This commit is contained in:
Marco Gaido 2018-01-12 11:25:37 -08:00 committed by Shixiong Zhu
parent 7bd14cfd40
commit 54277398af
2 changed files with 32 additions and 10 deletions

View file

@ -17,15 +17,11 @@
package org.apache.spark.sql.execution.streaming
import java.{util => ju}
import scala.collection.mutable
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.{Source => CodahaleSource}
import org.apache.spark.util.Clock
import org.apache.spark.sql.streaming.StreamingQueryProgress
/**
* Serves metrics from a [[org.apache.spark.sql.streaming.StreamingQuery]] to
@ -39,14 +35,17 @@ class MetricsReporter(
// Metric names should not have . in them, so that all the metrics of a query are identified
// together in Ganglia as a single metric group
registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond)
registerGauge("processingRate-total", () => stream.lastProgress.processedRowsPerSecond)
registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue())
registerGauge("inputRate-total", _.inputRowsPerSecond, 0.0)
registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L)
private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = {
private def registerGauge[T](
name: String,
f: StreamingQueryProgress => T,
default: T): Unit = {
synchronized {
metricRegistry.register(name, new Gauge[T] {
override def getValue: T = f()
override def getValue: T = Option(stream.lastProgress).map(f).getOrElse(default)
})
}
}

View file

@ -424,6 +424,29 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}
test("SPARK-22975: MetricsReporter defaults when there was no progress reported") {
withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") {
BlockingSource.latch = new CountDownLatch(1)
withTempDir { tempDir =>
val sq = spark.readStream
.format("org.apache.spark.sql.streaming.util.BlockingSource")
.load()
.writeStream
.format("org.apache.spark.sql.streaming.util.BlockingSource")
.option("checkpointLocation", tempDir.toString)
.start()
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
val gauges = sq.streamMetrics.metricRegistry.getGauges
assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0)
assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0)
assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] == 0.0)
sq.stop()
}
}
}
test("input row calculation with mixed batch and streaming sources") {
val streamingTriggerDF = spark.createDataset(1 to 10).toDF
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")