[SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay

### What changes were proposed in this pull request?
This pull request proposes a new API for streaming sources to signal that they can report metrics, and adds a use case to support Kafka micro batch stream to report the stats of # of offsets for the current offset falling behind the latest.

A public interface is added.

`metrics`: returns the metrics reported by the streaming source with given offset.

### Why are the changes needed?
The new API can expose any custom metrics for the "current" offset for streaming sources. Different from #31398, this PR makes metrics available to user through progress report, not through spark UI. A use case is that people want to know how the current offset falls behind the latest offset.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test for Kafka micro batch source v2 are added to test the Kafka use case.

Closes #31944 from yijiacui-db/SPARK-34297.

Authored-by: Yijia Cui <yijia.cui@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
This commit is contained in:
Yijia Cui 2021-05-05 17:26:07 +09:00 committed by Jungtaek Lim
parent f550e03b96
commit bbdbe0f734
6 changed files with 238 additions and 28 deletions

View file

@ -18,13 +18,16 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
import java.util.Optional
import scala.collection.JavaConverters._
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, ReportsSourceMetrics, SupportsAdmissionControl}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.UninterruptibleThread
@ -51,7 +54,8 @@ private[kafka010] class KafkaMicroBatchStream(
options: CaseInsensitiveStringMap,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging {
failOnDataLoss: Boolean)
extends SupportsAdmissionControl with ReportsSourceMetrics with MicroBatchStream with Logging {
private[kafka010] val pollTimeoutMs = options.getLong(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
@ -133,6 +137,10 @@ private[kafka010] class KafkaMicroBatchStream(
override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
override def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String] = {
KafkaMicroBatchStream.metrics(latestConsumedOffset, latestPartitionOffsets)
}
/**
* Read initial partition offsets from the checkpoint, or decide the offsets and write them to
* the checkpoint.
@ -218,3 +226,37 @@ private[kafka010] class KafkaMicroBatchStream(
}
}
}
object KafkaMicroBatchStream extends Logging {
/**
* Compute the difference of offset per partition between latestAvailablePartitionOffsets
* and partition offsets in the latestConsumedOffset.
* Report min/max/avg offsets behind the latest for all the partitions in the Kafka stream.
*
* Because of rate limit, latest consumed offset per partition can be smaller than
* the latest available offset per partition.
* @param latestConsumedOffset latest consumed offset
* @param latestAvailablePartitionOffsets latest available offset per partition
* @return the generated metrics map
*/
def metrics(
latestConsumedOffset: Optional[Offset],
latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
val offset = Option(latestConsumedOffset.orElse(null))
if (offset.nonEmpty && latestAvailablePartitionOffsets != null) {
val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
val offsetsBehindLatest = latestAvailablePartitionOffsets
.map(partitionOffset => partitionOffset._2 - consumedPartitionOffsets(partitionOffset._1))
if (offsetsBehindLatest.nonEmpty) {
val avgOffsetBehindLatest = offsetsBehindLatest.sum.toDouble / offsetsBehindLatest.size
return Map[String, String](
"minOffsetsBehindLatest" -> offsetsBehindLatest.min.toString,
"maxOffsetsBehindLatest" -> offsetsBehindLatest.max.toString,
"avgOffsetsBehindLatest" -> avgOffsetBehindLatest.toString).asJava
}
}
ju.Collections.emptyMap()
}
}

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.PartitionOffset
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
@ -62,4 +63,17 @@ private[kafka010] object KafkaSourceOffset {
*/
def apply(offset: SerializedOffset): KafkaSourceOffset =
KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
/**
* Returns [[KafkaSourceOffset]] from a streaming.Offset
*/
def apply(offset: streaming.Offset): KafkaSourceOffset = {
offset match {
case k: KafkaSourceOffset => k
case so: SerializedOffset => apply(so)
case _ =>
throw new IllegalArgumentException(
s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset")
}
}
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import java.io._
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.{Files, Paths}
import java.util.Locale
import java.util.{Locale, Optional}
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
@ -1297,6 +1297,112 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
test("test custom metrics - with rate limit") {
import testImplicits._
val topic = newTopic()
val data = 1 to 10
testUtils.createTopic(topic, partitions = 2)
testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("maxOffsetsPerTrigger", 1)
.option(STARTING_OFFSETS_OPTION_KEY, "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map(_.toInt)
testStream(kafka)(
StartStream(),
makeSureGetOffsetCalled,
CheckAnswer(data: _*),
Execute { query =>
// The rate limit is 1, so there must be some delay in offsets per partition.
val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find { progress =>
// find the metrics that has non-zero average offsetsBehindLatest greater than 0.
!progress.metrics.isEmpty && progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
}
assert(progressWithDelay.nonEmpty)
val metrics = progressWithDelay.get.metrics
assert(metrics.keySet() ===
Set("minOffsetsBehindLatest",
"maxOffsetsBehindLatest",
"avgOffsetsBehindLatest").asJava)
assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
}
)
}
test("test custom metrics - no rate limit") {
import testImplicits._
val topic = newTopic()
val data = 1 to 10
testUtils.createTopic(topic, partitions = 2)
testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option(STARTING_OFFSETS_OPTION_KEY, "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map(_.toInt)
testStream(kafka)(
StartStream(),
makeSureGetOffsetCalled,
CheckAnswer(data: _*),
Execute { query =>
val progress = query.recentProgress.map(_.sources.head).lastOption
assert(progress.nonEmpty)
val metrics = progress.get.metrics
// When there is no rate limit, there shouldn't be any delay in the current stream.
assert(metrics.keySet() ===
Set("minOffsetsBehindLatest",
"maxOffsetsBehindLatest",
"avgOffsetsBehindLatest").asJava)
assert(metrics.get("minOffsetsBehindLatest").toLong === 0)
assert(metrics.get("maxOffsetsBehindLatest").toLong === 0)
assert(metrics.get("avgOffsetsBehindLatest").toDouble === 0)
}
)
}
test("test custom metrics - corner cases") {
val topicPartition1 = new TopicPartition(newTopic(), 0)
val topicPartition2 = new TopicPartition(newTopic(), 0)
val latestOffset = Map[TopicPartition, Long]((topicPartition1, 3L), (topicPartition2, 6L))
// test empty offset.
assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset).isEmpty)
// test valid offsetsBehindLatest
val offset = KafkaSourceOffset(
Map[TopicPartition, Long]((topicPartition1, 1L), (topicPartition2, 2L)))
assert(
KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), latestOffset) ===
Map[String, String](
"minOffsetsBehindLatest" -> "2",
"maxOffsetsBehindLatest" -> "4",
"avgOffsetsBehindLatest" -> "3.0").asJava)
// test null latestAvailablePartitionOffsets
assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), null).isEmpty)
}
}
abstract class KafkaSourceSuiteBase extends KafkaSourceTest {

View file

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.read.streaming;
import java.util.Map;
import java.util.Optional;
import org.apache.spark.annotation.Evolving;
/**
* A mix-in interface for {@link SparkDataStream} streaming sources to signal that they can report
* metrics.
*/
@Evolving
public interface ReportsSourceMetrics extends SparkDataStream {
/**
* Returns the metrics reported by the streaming source with respect to
* the latest consumed offset.
*
* @param latestConsumedOffset the end offset (exclusive) of the latest triggered batch.
*/
Map<String, String> metrics(Optional<Offset> latestConsumedOffset);
}

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.streaming
import java.text.SimpleDateFormat
import java.util.{Date, UUID}
import java.util.{Date, Optional, UUID}
import scala.collection.JavaConverters._
import scala.collection.mutable
@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalP
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, SparkDataStream}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ReportsSourceMetrics, SparkDataStream}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation, StreamWriterCommitProgress}
import org.apache.spark.sql.streaming._
@ -101,6 +101,8 @@ trait ProgressReporter extends Logging {
isTriggerActive = false)
}
private var latestStreamProgress: StreamProgress = _
/** Returns the current status of the query. */
def status: StreamingQueryStatus = currentStatus
@ -136,6 +138,7 @@ trait ProgressReporter extends Logging {
currentTriggerStartOffsets = from.mapValues(_.json).toMap
currentTriggerEndOffsets = to.mapValues(_.json).toMap
currentTriggerLatestOffsets = latest.mapValues(_.json).toMap
latestStreamProgress = to
}
private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
@ -175,6 +178,11 @@ trait ProgressReporter extends Logging {
val sourceProgress = sources.distinct.map { source =>
val numRecords = executionStats.inputRows.getOrElse(source, 0L)
val sourceMetrics = source match {
case withMetrics: ReportsSourceMetrics =>
withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
case _ => Map[String, String]().asJava
}
new SourceProgress(
description = source.toString,
startOffset = currentTriggerStartOffsets.get(source).orNull,
@ -182,7 +190,8 @@ trait ProgressReporter extends Logging {
latestOffset = currentTriggerLatestOffsets.get(source).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
processedRowsPerSecond = numRecords / processingTimeSec
processedRowsPerSecond = numRecords / processingTimeSec,
metrics = sourceMetrics
)
}

View file

@ -33,6 +33,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
/**
@ -138,17 +139,6 @@ class StreamingQueryProgress private[sql](
override def toString: String = prettyJson
private[sql] def jsonValue: JValue = {
def safeDoubleToJValue(value: Double): JValue = {
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}
/** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
if (map.isEmpty) return JNothing
val keys = map.asScala.keySet.toSeq.sorted
keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
}
("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
@ -188,7 +178,8 @@ class SourceProgress protected[sql](
val latestOffset: String,
val numInputRows: Long,
val inputRowsPerSecond: Double,
val processedRowsPerSecond: Double) extends Serializable {
val processedRowsPerSecond: Double,
val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {
/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
@ -199,17 +190,14 @@ class SourceProgress protected[sql](
override def toString: String = prettyJson
private[sql] def jsonValue: JValue = {
def safeDoubleToJValue(value: Double): JValue = {
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}
("description" -> JString(description)) ~
("startOffset" -> tryParse(startOffset)) ~
("endOffset" -> tryParse(endOffset)) ~
("latestOffset" -> tryParse(latestOffset)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
("startOffset" -> tryParse(startOffset)) ~
("endOffset" -> tryParse(endOffset)) ~
("latestOffset" -> tryParse(latestOffset)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
}
private def tryParse(json: String) = try {
@ -258,3 +246,16 @@ private[sql] object SinkProgress {
def apply(description: String, numOutputRows: Option[Long]): SinkProgress =
new SinkProgress(description, numOutputRows.getOrElse(DEFAULT_NUM_OUTPUT_ROWS))
}
private object SafeJsonSerializer {
def safeDoubleToJValue(value: Double): JValue = {
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}
/** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
if (map.isEmpty) return JNothing
val keys = map.asScala.keySet.toSeq.sorted
keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
}
}