[SPARK-17813][SQL][KAFKA] Maximum data per trigger

## What changes were proposed in this pull request?

maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions.

## How was this patch tested?

Added unit test

Author: cody koeninger <cody@koeninger.org>

Closes #15527 from koeninger/SPARK-17813.
This commit is contained in:
cody koeninger 2016-10-27 10:30:59 -07:00 committed by Shixiong Zhu
parent 701a9d361b
commit 1042325805
3 changed files with 156 additions and 26 deletions

View file

@ -221,6 +221,12 @@ The following configurations are optional:
<td>10</td>
<td>milliseconds to wait before retrying to fetch Kafka offsets</td>
</tr>
<tr>
<td>maxOffsetsPerTrigger</td>
<td>long</td>
<td>none</td>
<td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
</tr>
</table>
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,

View file

@ -96,6 +96,9 @@ private[kafka010] case class KafkaSource(
private val offsetFetchAttemptIntervalMs =
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
private val maxOffsetsPerTrigger =
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
/**
* A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
* offsets and never commits them.
@ -121,6 +124,8 @@ private[kafka010] case class KafkaSource(
}.partitionToOffsets
}
private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None
override def schema: StructType = KafkaSource.kafkaSchema
/** Returns the maximum available offset for this source. */
@ -128,9 +133,54 @@ private[kafka010] case class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets
val offset = KafkaSourceOffset(fetchLatestOffsets())
logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
Some(offset)
val latest = fetchLatestOffsets()
val offsets = maxOffsetsPerTrigger match {
case None =>
latest
case Some(limit) if currentPartitionOffsets.isEmpty =>
rateLimit(limit, initialPartitionOffsets, latest)
case Some(limit) =>
rateLimit(limit, currentPartitionOffsets.get, latest)
}
currentPartitionOffsets = Some(offsets)
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
Some(KafkaSourceOffset(offsets))
}
/** Proportionally distribute limit number of offsets among topicpartitions */
private def rateLimit(
limit: Long,
from: Map[TopicPartition, Long],
until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val sizes = until.flatMap {
case (tp, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
val size = end - begin
logDebug(s"rateLimit $tp size is $size")
if (size > 0) Some(tp -> size) else None
}
}
val total = sizes.values.sum.toDouble
if (total < 1) {
until
} else {
until.map {
case (tp, end) =>
tp -> sizes.get(tp).map { size =>
val begin = from.get(tp).getOrElse(fromNew(tp))
val prorate = limit * (size / total)
logDebug(s"rateLimit $tp prorated amount is $prorate")
// Don't completely starve small topicpartitions
val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
logDebug(s"rateLimit $tp new offset is $off")
// Paranoia, make sure not to return an offset that's past end
Math.min(end, off)
}.getOrElse(end)
}
}
}
/**
@ -153,11 +203,7 @@ private[kafka010] case class KafkaSource(
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
val newPartitionOffsets = if (newPartitions.nonEmpty) {
fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
} else {
Map.empty[TopicPartition, Long]
}
val newPartitionOffsets = fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
if (newPartitionOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
@ -221,6 +267,12 @@ private[kafka010] case class KafkaSource(
logInfo("GetBatch generating RDD of offset range: " +
offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
// On recovery, getBatch will get called before getOffset
if (currentPartitionOffsets.isEmpty) {
currentPartitionOffsets = Some(untilPartitionOffsets)
}
sqlContext.createDataFrame(rdd, schema)
}
@ -305,23 +357,28 @@ private[kafka010] case class KafkaSource(
* some partitions if they are deleted.
*/
private def fetchNewPartitionEarliestOffsets(
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"\tPartitions assigned to consumer: $partitions")
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] =
if (newPartitions.isEmpty) {
Map.empty[TopicPartition, Long]
} else {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"\tPartitions assigned to consumer: $partitions")
// Get the earliest offset of each partition
consumer.seekToBeginning(partitions)
val partitionOffsets = newPartitions.filter { p =>
// When deleting topics happen at the same time, some partitions may not be in `partitions`.
// So we need to ignore them
partitions.contains(p)
}.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
partitionOffsets
}
// Get the earliest offset of each partition
consumer.seekToBeginning(partitions)
val partitionOffsets = newPartitions.filter { p =>
// When deleting topics happen at the same time, some partitions may not be in
// `partitions`. So we need to ignore them
partitions.contains(p)
}.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
partitionOffsets
}
}
/**
* Helper function that does multiple retries on the a body of code that returns offsets.

View file

@ -23,13 +23,14 @@ import scala.util.Random
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.{ ProcessingTime, StreamTest }
import org.apache.spark.sql.test.SharedSQLContext
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
protected var testUtils: KafkaTestUtils = _
@ -133,6 +134,72 @@ class KafkaSourceSuite extends KafkaSourceTest {
private val topicId = new AtomicInteger(0)
test("maxOffsetsPerTrigger") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
testUtils.sendMessages(topic, Array("1"), Some(2))
val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("maxOffsetsPerTrigger", 10)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
val clock = new StreamManualClock
val waitUntilBatchProcessed = AssertOnQuery { q =>
eventually(Timeout(streamingTimeout)) {
if (!q.exception.isDefined) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
true
}
testStream(mapped)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 1 from smallest, 1 from middle, 8 from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116
),
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
12, 117, 118, 119, 120, 121, 122, 123, 124, 125
),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
13, 126, 127, 128, 129, 130, 131, 132, 133, 134
)
)
}
test("cannot stop Kafka stream") {
val topic = newTopic()
testUtils.createTopic(newTopic(), partitions = 5)