[SPARK-24863][SS] Report Kafka offset lag as a custom metrics

## What changes were proposed in this pull request?

This builds on top of SPARK-24748 to report 'offset lag' as a custom metrics for Kafka structured streaming source.

This lag is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 (or close to 0) if spark keeps up with the rate at which messages are ingested into Kafka topics in steady state. This measures how far behind the spark source has fallen behind (per partition) and can aid in tuning the application.

## How was this patch tested?

Existing and new unit tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21819 from arunmahadevan/SPARK-24863.

Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
This commit is contained in:
Arun Mahadevan 2018-08-18 17:31:52 +08:00 committed by hyukjinkwon
parent 9047cc0f2c
commit 14d7c1c3e9
3 changed files with 85 additions and 12 deletions

View file

@ -29,6 +29,11 @@ import org.json4s.jackson.Serialization
*/ */
private object JsonUtils { private object JsonUtils {
private implicit val formats = Serialization.formats(NoTypeHints) private implicit val formats = Serialization.formats(NoTypeHints)
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}
/** /**
* Read TopicPartitions from json string * Read TopicPartitions from json string
@ -51,7 +56,7 @@ private object JsonUtils {
* Write TopicPartitions as json string * Write TopicPartitions as json string
*/ */
def partitions(partitions: Iterable[TopicPartition]): String = { def partitions(partitions: Iterable[TopicPartition]): String = {
val result = new HashMap[String, List[Int]] val result = HashMap.empty[String, List[Int]]
partitions.foreach { tp => partitions.foreach { tp =>
val parts: List[Int] = result.getOrElse(tp.topic, Nil) val parts: List[Int] = result.getOrElse(tp.topic, Nil)
result += tp.topic -> (tp.partition::parts) result += tp.topic -> (tp.partition::parts)
@ -80,19 +85,31 @@ private object JsonUtils {
* Write per-TopicPartition offsets as json string * Write per-TopicPartition offsets as json string
*/ */
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]() val result = HashMap.empty[String, HashMap[Int, Long]]
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp => partitions.foreach { tp =>
val off = partitionOffsets(tp) val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long]) val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
parts += tp.partition -> off parts += tp.partition -> off
result += tp.topic -> parts result += tp.topic -> parts
} }
Serialization.write(result) Serialization.write(result)
} }
/**
* Write per-topic partition lag as json string
*/
def partitionLags(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]): String = {
val result = HashMap.empty[String, HashMap[Int, Long]]
val partitions = latestOffsets.keySet.toSeq.sorted
partitions.foreach { tp =>
val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L)
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
parts += tp.partition -> lag
result += tp.topic -> parts
}
Serialization.write(Map("lag" -> result))
}
} }

View file

@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkEnv import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
@ -33,9 +34,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset, SupportsCustomReaderMetrics}
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.apache.spark.util.UninterruptibleThread import org.apache.spark.util.UninterruptibleThread
@ -62,7 +63,7 @@ private[kafka010] class KafkaMicroBatchReader(
metadataPath: String, metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit, startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) failOnDataLoss: Boolean)
extends MicroBatchReader with Logging { extends MicroBatchReader with SupportsCustomReaderMetrics with Logging {
private var startPartitionOffsets: PartitionOffsetMap = _ private var startPartitionOffsets: PartitionOffsetMap = _
private var endPartitionOffsets: PartitionOffsetMap = _ private var endPartitionOffsets: PartitionOffsetMap = _
@ -158,6 +159,10 @@ private[kafka010] class KafkaMicroBatchReader(
KafkaSourceOffset(endPartitionOffsets) KafkaSourceOffset(endPartitionOffsets)
} }
override def getCustomMetrics: CustomMetrics = {
KafkaCustomMetrics(kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets)
}
override def deserializeOffset(json: String): Offset = { override def deserializeOffset(json: String): Offset = {
KafkaSourceOffset(JsonUtils.partitionOffsets(json)) KafkaSourceOffset(JsonUtils.partitionOffsets(json))
} }
@ -380,3 +385,18 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader(
} }
} }
} }
/**
* Currently reports per topic-partition lag.
* This is the difference between the offset of the latest available data
* in a topic-partition and the latest offset that has been processed.
*/
private[kafka010] case class KafkaCustomMetrics(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
override def json(): String = {
JsonUtils.partitionLags(latestOffsets, processedOffsets)
}
override def toString: String = json()
}

View file

@ -31,12 +31,13 @@ import scala.util.Random
import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession} import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
@ -701,6 +702,41 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) } intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
} }
test("custom lag metrics") {
import testImplicits._
val topic = newTopic()
testUtils.createTopic(topic, partitions = 2)
testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray)
require(testUtils.getLatestOffsets(Set(topic)).size === 2)
val kafka = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("startingOffsets", s"earliest")
.option("maxOffsetsPerTrigger", 10)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
implicit val formats = DefaultFormats
val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
StartStream(trigger = OneTimeTrigger),
AssertOnQuery { query =>
query.awaitTermination()
val source = query.lastProgress.sources(0)
// masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each
// so 5 events should be processed from each partition and a lag of 45 events
val custom = parse(source.customMetrics)
.extract[Map[String, Map[String, Map[String, Long]]]]
custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45
}
)
}
} }
abstract class KafkaSourceSuiteBase extends KafkaSourceTest { abstract class KafkaSourceSuiteBase extends KafkaSourceTest {