[SPARK-23362][SS] Migrate Kafka Microbatch source to v2
## What changes were proposed in this pull request? Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with data source v2). Performance comparison: In a unit test with in-process Kafka broker, I tested the read throughput of V1 and V2 using 20M records in a single partition. They were comparable. ## How was this patch tested? Existing tests, few modified to be better tests than the existing ones. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #20554 from tdas/SPARK-23362.
This commit is contained in:
parent
c5857e496f
commit
0a73aa31f4
|
@ -105,3 +105,4 @@ META-INF/*
|
|||
spark-warehouse
|
||||
structured-streaming/*
|
||||
kafka-source-initial-offset-version-2.1.0.bin
|
||||
kafka-source-initial-offset-future-version.bin
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
|
|||
|
||||
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.kafka010.KafkaSource._
|
||||
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
|
||||
import org.apache.spark.util.UninterruptibleThread
|
||||
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
|
|||
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
|
||||
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
|
||||
import org.apache.spark.sql.catalyst.util.DateTimeUtils
|
||||
import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
|
||||
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
|
||||
import org.apache.spark.sql.sources.v2.reader._
|
||||
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
@ -187,13 +187,9 @@ class KafkaContinuousDataReader(
|
|||
kafkaParams: ju.Map[String, Object],
|
||||
pollTimeoutMs: Long,
|
||||
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
|
||||
private val topic = topicPartition.topic
|
||||
private val kafkaPartition = topicPartition.partition
|
||||
private val consumer = CachedKafkaConsumer.createUncached(topic, kafkaPartition, kafkaParams)
|
||||
|
||||
private val sharedRow = new UnsafeRow(7)
|
||||
private val bufferHolder = new BufferHolder(sharedRow)
|
||||
private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
|
||||
private val consumer =
|
||||
CachedKafkaConsumer.createUncached(topicPartition.topic, topicPartition.partition, kafkaParams)
|
||||
private val converter = new KafkaRecordToUnsafeRowConverter
|
||||
|
||||
private var nextKafkaOffset = startOffset
|
||||
private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
|
||||
|
@ -232,22 +228,7 @@ class KafkaContinuousDataReader(
|
|||
}
|
||||
|
||||
override def get(): UnsafeRow = {
|
||||
bufferHolder.reset()
|
||||
|
||||
if (currentRecord.key == null) {
|
||||
rowWriter.setNullAt(0)
|
||||
} else {
|
||||
rowWriter.write(0, currentRecord.key)
|
||||
}
|
||||
rowWriter.write(1, currentRecord.value)
|
||||
rowWriter.write(2, UTF8String.fromString(currentRecord.topic))
|
||||
rowWriter.write(3, currentRecord.partition)
|
||||
rowWriter.write(4, currentRecord.offset)
|
||||
rowWriter.write(5,
|
||||
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(currentRecord.timestamp)))
|
||||
rowWriter.write(6, currentRecord.timestampType.id)
|
||||
sharedRow.setTotalSize(bufferHolder.totalSize)
|
||||
sharedRow
|
||||
converter.toUnsafeRow(currentRecord)
|
||||
}
|
||||
|
||||
override def getOffset(): KafkaSourcePartitionOffset = {
|
||||
|
|
403
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
vendored
Normal file
403
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
vendored
Normal file
|
@ -0,0 +1,403 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.kafka010
|
||||
|
||||
import java.{util => ju}
|
||||
import java.io._
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
||||
import org.apache.spark.SparkEnv
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
|
||||
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
|
||||
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
|
||||
import org.apache.spark.sql.sources.v2.DataSourceOptions
|
||||
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
|
||||
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.util.UninterruptibleThread
|
||||
|
||||
/**
|
||||
* A [[MicroBatchReader]] that reads data from Kafka.
|
||||
*
|
||||
* The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
|
||||
* a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
|
||||
* example if the last record in a Kafka topic "t", partition 2 is offset 5, then
|
||||
* KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
|
||||
* with the semantics of `KafkaConsumer.position()`.
|
||||
*
|
||||
* Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
|
||||
* must make sure all messages in a topic have been processed when deleting a topic.
|
||||
*
|
||||
* There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
|
||||
* To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
|
||||
* and not use wrong broker addresses.
|
||||
*/
|
||||
private[kafka010] class KafkaMicroBatchReader(
|
||||
kafkaOffsetReader: KafkaOffsetReader,
|
||||
executorKafkaParams: ju.Map[String, Object],
|
||||
options: DataSourceOptions,
|
||||
metadataPath: String,
|
||||
startingOffsets: KafkaOffsetRangeLimit,
|
||||
failOnDataLoss: Boolean)
|
||||
extends MicroBatchReader with SupportsScanUnsafeRow with Logging {
|
||||
|
||||
type PartitionOffsetMap = Map[TopicPartition, Long]
|
||||
|
||||
private var startPartitionOffsets: PartitionOffsetMap = _
|
||||
private var endPartitionOffsets: PartitionOffsetMap = _
|
||||
|
||||
private val pollTimeoutMs = options.getLong(
|
||||
"kafkaConsumer.pollTimeoutMs",
|
||||
SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
|
||||
|
||||
private val maxOffsetsPerTrigger =
|
||||
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
|
||||
|
||||
/**
|
||||
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
|
||||
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
|
||||
* `KafkaConsumer.poll` may hang forever (KAFKA-1894).
|
||||
*/
|
||||
private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
|
||||
|
||||
override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
|
||||
// Make sure initialPartitionOffsets is initialized
|
||||
initialPartitionOffsets
|
||||
|
||||
startPartitionOffsets = Option(start.orElse(null))
|
||||
.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
|
||||
.getOrElse(initialPartitionOffsets)
|
||||
|
||||
endPartitionOffsets = Option(end.orElse(null))
|
||||
.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
|
||||
.getOrElse {
|
||||
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
|
||||
maxOffsetsPerTrigger.map { maxOffsets =>
|
||||
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
|
||||
}.getOrElse {
|
||||
latestPartitionOffsets
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
|
||||
// Find the new partitions, and get their earliest offsets
|
||||
val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
|
||||
val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
|
||||
if (newPartitionOffsets.keySet != newPartitions) {
|
||||
// We cannot get from offsets for some partitions. It means they got deleted.
|
||||
val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
|
||||
reportDataLoss(
|
||||
s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
|
||||
}
|
||||
logInfo(s"Partitions added: $newPartitionOffsets")
|
||||
newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
|
||||
reportDataLoss(
|
||||
s"Added partition $p starts from $o instead of 0. Some data may have been missed")
|
||||
}
|
||||
|
||||
// Find deleted partitions, and report data loss if required
|
||||
val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
|
||||
if (deletedPartitions.nonEmpty) {
|
||||
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
|
||||
}
|
||||
|
||||
// Use the until partitions to calculate offset ranges to ignore partitions that have
|
||||
// been deleted
|
||||
val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
|
||||
// Ignore partitions that we don't know the from offsets.
|
||||
newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
|
||||
}.toSeq
|
||||
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
|
||||
|
||||
val sortedExecutors = getSortedExecutorList()
|
||||
val numExecutors = sortedExecutors.length
|
||||
logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
|
||||
|
||||
// Calculate offset ranges
|
||||
val factories = topicPartitions.flatMap { tp =>
|
||||
val fromOffset = startPartitionOffsets.get(tp).getOrElse {
|
||||
newPartitionOffsets.getOrElse(
|
||||
tp, {
|
||||
// This should not happen since newPartitionOffsets contains all partitions not in
|
||||
// fromPartitionOffsets
|
||||
throw new IllegalStateException(s"$tp doesn't have a from offset")
|
||||
})
|
||||
}
|
||||
val untilOffset = endPartitionOffsets(tp)
|
||||
|
||||
if (untilOffset >= fromOffset) {
|
||||
// This allows cached KafkaConsumers in the executors to be re-used to read the same
|
||||
// partition in every batch.
|
||||
val preferredLoc = if (numExecutors > 0) {
|
||||
Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
|
||||
} else None
|
||||
val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
|
||||
Some(
|
||||
new KafkaMicroBatchDataReaderFactory(
|
||||
range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
|
||||
} else {
|
||||
reportDataLoss(
|
||||
s"Partition $tp's offset was changed from " +
|
||||
s"$fromOffset to $untilOffset, some data may have been missed")
|
||||
None
|
||||
}
|
||||
}
|
||||
factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
|
||||
}
|
||||
|
||||
override def getStartOffset: Offset = {
|
||||
KafkaSourceOffset(startPartitionOffsets)
|
||||
}
|
||||
|
||||
override def getEndOffset: Offset = {
|
||||
KafkaSourceOffset(endPartitionOffsets)
|
||||
}
|
||||
|
||||
override def deserializeOffset(json: String): Offset = {
|
||||
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
|
||||
}
|
||||
|
||||
override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
|
||||
|
||||
override def commit(end: Offset): Unit = {}
|
||||
|
||||
override def stop(): Unit = {
|
||||
kafkaOffsetReader.close()
|
||||
}
|
||||
|
||||
override def toString(): String = s"Kafka[$kafkaOffsetReader]"
|
||||
|
||||
/**
|
||||
* Read initial partition offsets from the checkpoint, or decide the offsets and write them to
|
||||
* the checkpoint.
|
||||
*/
|
||||
private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
|
||||
// Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
|
||||
// Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
|
||||
// (KAFKA-1894).
|
||||
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
|
||||
|
||||
// SparkSession is required for getting Hadoop configuration for writing to checkpoints
|
||||
assert(SparkSession.getActiveSession.nonEmpty)
|
||||
|
||||
val metadataLog =
|
||||
new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
|
||||
metadataLog.get(0).getOrElse {
|
||||
val offsets = startingOffsets match {
|
||||
case EarliestOffsetRangeLimit =>
|
||||
KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
|
||||
case LatestOffsetRangeLimit =>
|
||||
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
|
||||
case SpecificOffsetRangeLimit(p) =>
|
||||
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
|
||||
}
|
||||
metadataLog.add(0, offsets)
|
||||
logInfo(s"Initial offsets: $offsets")
|
||||
offsets
|
||||
}.partitionToOffsets
|
||||
}
|
||||
|
||||
/** Proportionally distribute limit number of offsets among topicpartitions */
|
||||
private def rateLimit(
|
||||
limit: Long,
|
||||
from: PartitionOffsetMap,
|
||||
until: PartitionOffsetMap): PartitionOffsetMap = {
|
||||
val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
|
||||
val sizes = until.flatMap {
|
||||
case (tp, end) =>
|
||||
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
|
||||
from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
|
||||
val size = end - begin
|
||||
logDebug(s"rateLimit $tp size is $size")
|
||||
if (size > 0) Some(tp -> size) else None
|
||||
}
|
||||
}
|
||||
val total = sizes.values.sum.toDouble
|
||||
if (total < 1) {
|
||||
until
|
||||
} else {
|
||||
until.map {
|
||||
case (tp, end) =>
|
||||
tp -> sizes.get(tp).map { size =>
|
||||
val begin = from.get(tp).getOrElse(fromNew(tp))
|
||||
val prorate = limit * (size / total)
|
||||
// Don't completely starve small topicpartitions
|
||||
val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
|
||||
// Paranoia, make sure not to return an offset that's past end
|
||||
Math.min(end, off)
|
||||
}.getOrElse(end)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def getSortedExecutorList(): Array[String] = {
|
||||
|
||||
def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
|
||||
if (a.host == b.host) {
|
||||
a.executorId > b.executorId
|
||||
} else {
|
||||
a.host > b.host
|
||||
}
|
||||
}
|
||||
|
||||
val bm = SparkEnv.get.blockManager
|
||||
bm.master.getPeers(bm.blockManagerId).toArray
|
||||
.map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
|
||||
.sortWith(compare)
|
||||
.map(_.toString)
|
||||
}
|
||||
|
||||
/**
|
||||
* If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
|
||||
* Otherwise, just log a warning.
|
||||
*/
|
||||
private def reportDataLoss(message: String): Unit = {
|
||||
if (failOnDataLoss) {
|
||||
throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
|
||||
} else {
|
||||
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
|
||||
}
|
||||
}
|
||||
|
||||
/** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */
|
||||
class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
|
||||
extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
|
||||
|
||||
val VERSION = 1
|
||||
|
||||
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
|
||||
out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
|
||||
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
|
||||
writer.write("v" + VERSION + "\n")
|
||||
writer.write(metadata.json)
|
||||
writer.flush
|
||||
}
|
||||
|
||||
override def deserialize(in: InputStream): KafkaSourceOffset = {
|
||||
in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
|
||||
val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
|
||||
// HDFSMetadataLog guarantees that it never creates a partial file.
|
||||
assert(content.length != 0)
|
||||
if (content(0) == 'v') {
|
||||
val indexOfNewLine = content.indexOf("\n")
|
||||
if (indexOfNewLine > 0) {
|
||||
val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
|
||||
KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
|
||||
} else {
|
||||
throw new IllegalStateException(
|
||||
s"Log file was malformed: failed to detect the log file version line.")
|
||||
}
|
||||
} else {
|
||||
// The log was generated by Spark 2.1.0
|
||||
KafkaSourceOffset(SerializedOffset(content))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
|
||||
private[kafka010] class KafkaMicroBatchDataReaderFactory(
|
||||
range: KafkaOffsetRange,
|
||||
preferredLoc: Option[String],
|
||||
executorKafkaParams: ju.Map[String, Object],
|
||||
pollTimeoutMs: Long,
|
||||
failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
|
||||
|
||||
override def preferredLocations(): Array[String] = preferredLoc.toArray
|
||||
|
||||
override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader(
|
||||
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
|
||||
}
|
||||
|
||||
/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
|
||||
private[kafka010] class KafkaMicroBatchDataReader(
|
||||
offsetRange: KafkaOffsetRange,
|
||||
executorKafkaParams: ju.Map[String, Object],
|
||||
pollTimeoutMs: Long,
|
||||
failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging {
|
||||
|
||||
private val consumer = CachedKafkaConsumer.getOrCreate(
|
||||
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
|
||||
private val rangeToRead = resolveRange(offsetRange)
|
||||
private val converter = new KafkaRecordToUnsafeRowConverter
|
||||
|
||||
private var nextOffset = rangeToRead.fromOffset
|
||||
private var nextRow: UnsafeRow = _
|
||||
|
||||
override def next(): Boolean = {
|
||||
if (nextOffset < rangeToRead.untilOffset) {
|
||||
val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
|
||||
if (record != null) {
|
||||
nextRow = converter.toUnsafeRow(record)
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
override def get(): UnsafeRow = {
|
||||
assert(nextRow != null)
|
||||
nextOffset += 1
|
||||
nextRow
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
// Indicate that we're no longer using this consumer
|
||||
CachedKafkaConsumer.releaseKafkaConsumer(
|
||||
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
|
||||
}
|
||||
|
||||
private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
|
||||
if (range.fromOffset < 0 || range.untilOffset < 0) {
|
||||
// Late bind the offset range
|
||||
val availableOffsetRange = consumer.getAvailableOffsetRange()
|
||||
val fromOffset = if (range.fromOffset < 0) {
|
||||
assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
|
||||
s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}")
|
||||
availableOffsetRange.earliest
|
||||
} else {
|
||||
range.fromOffset
|
||||
}
|
||||
val untilOffset = if (range.untilOffset < 0) {
|
||||
assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
|
||||
s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}")
|
||||
availableOffsetRange.latest
|
||||
} else {
|
||||
range.untilOffset
|
||||
}
|
||||
KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset)
|
||||
} else {
|
||||
range
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[kafka010] case class KafkaOffsetRange(
|
||||
topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long)
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.kafka010
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
|
||||
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
|
||||
import org.apache.spark.sql.catalyst.util.DateTimeUtils
|
||||
import org.apache.spark.unsafe.types.UTF8String
|
||||
|
||||
/** A simple class for converting Kafka ConsumerRecord to UnsafeRow */
|
||||
private[kafka010] class KafkaRecordToUnsafeRowConverter {
|
||||
private val sharedRow = new UnsafeRow(7)
|
||||
private val bufferHolder = new BufferHolder(sharedRow)
|
||||
private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
|
||||
|
||||
def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
|
||||
bufferHolder.reset()
|
||||
|
||||
if (record.key == null) {
|
||||
rowWriter.setNullAt(0)
|
||||
} else {
|
||||
rowWriter.write(0, record.key)
|
||||
}
|
||||
rowWriter.write(1, record.value)
|
||||
rowWriter.write(2, UTF8String.fromString(record.topic))
|
||||
rowWriter.write(3, record.partition)
|
||||
rowWriter.write(4, record.offset)
|
||||
rowWriter.write(
|
||||
5,
|
||||
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(record.timestamp)))
|
||||
rowWriter.write(6, record.timestampType.id)
|
||||
sharedRow.setTotalSize(bufferHolder.totalSize)
|
||||
sharedRow
|
||||
}
|
||||
}
|
|
@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
|
|||
import org.apache.spark.sql.catalyst.util.DateTimeUtils
|
||||
import org.apache.spark.sql.execution.streaming._
|
||||
import org.apache.spark.sql.kafka010.KafkaSource._
|
||||
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.unsafe.types.UTF8String
|
||||
|
||||
|
@ -306,7 +307,7 @@ private[kafka010] class KafkaSource(
|
|||
kafkaReader.close()
|
||||
}
|
||||
|
||||
override def toString(): String = s"KafkaSource[$kafkaReader]"
|
||||
override def toString(): String = s"KafkaSourceV1[$kafkaReader]"
|
||||
|
||||
/**
|
||||
* If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
|
||||
|
@ -323,22 +324,6 @@ private[kafka010] class KafkaSource(
|
|||
|
||||
/** Companion object for the [[KafkaSource]]. */
|
||||
private[kafka010] object KafkaSource {
|
||||
val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
|
||||
"""
|
||||
|Some data may have been lost because they are not available in Kafka any more; either the
|
||||
| data was aged out by Kafka or the topic may have been deleted before all the data in the
|
||||
| topic was processed. If you want your streaming query to fail on such cases, set the source
|
||||
| option "failOnDataLoss" to "true".
|
||||
""".stripMargin
|
||||
|
||||
val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
|
||||
"""
|
||||
|Some data may have been lost because they are not available in Kafka any more; either the
|
||||
| data was aged out by Kafka or the topic may have been deleted before all the data in the
|
||||
| topic was processed. If you don't want your streaming query to fail on such cases, set the
|
||||
| source option "failOnDataLoss" to "false".
|
||||
""".stripMargin
|
||||
|
||||
private[kafka010] val VERSION = 1
|
||||
|
||||
def getSortedExecutorList(sc: SparkContext): Array[String] = {
|
||||
|
|
|
@ -30,13 +30,13 @@ import org.apache.spark.internal.Logging
|
|||
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
|
||||
import org.apache.spark.sql.execution.streaming.{Sink, Source}
|
||||
import org.apache.spark.sql.sources._
|
||||
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport}
|
||||
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
|
||||
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
|
||||
import org.apache.spark.sql.streaming.OutputMode
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
/**
|
||||
* The provider class for the [[KafkaSource]]. This provider is designed such that it throws
|
||||
* The provider class for all Kafka readers and writers. It is designed such that it throws
|
||||
* IllegalArgumentException when the Kafka Dataset is created, so that it can catch
|
||||
* missing options even before the query is started.
|
||||
*/
|
||||
|
@ -47,6 +47,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
|
|||
with CreatableRelationProvider
|
||||
with StreamWriteSupport
|
||||
with ContinuousReadSupport
|
||||
with MicroBatchReadSupport
|
||||
with Logging {
|
||||
import KafkaSourceProvider._
|
||||
|
||||
|
@ -105,6 +106,52 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
|
|||
failOnDataLoss(caseInsensitiveParams))
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches
|
||||
* of Kafka data in a micro-batch streaming query.
|
||||
*/
|
||||
override def createMicroBatchReader(
|
||||
schema: Optional[StructType],
|
||||
metadataPath: String,
|
||||
options: DataSourceOptions): KafkaMicroBatchReader = {
|
||||
|
||||
val parameters = options.asMap().asScala.toMap
|
||||
validateStreamOptions(parameters)
|
||||
// Each running query should use its own group id. Otherwise, the query may be only assigned
|
||||
// partial data since Kafka will assign partitions to multiple consumers having the same group
|
||||
// id. Hence, we should generate a unique id for each query.
|
||||
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
|
||||
|
||||
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
|
||||
val specifiedKafkaParams =
|
||||
parameters
|
||||
.keySet
|
||||
.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
|
||||
.map { k => k.drop(6).toString -> parameters(k) }
|
||||
.toMap
|
||||
|
||||
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
|
||||
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
|
||||
|
||||
val kafkaOffsetReader = new KafkaOffsetReader(
|
||||
strategy(caseInsensitiveParams),
|
||||
kafkaParamsForDriver(specifiedKafkaParams),
|
||||
parameters,
|
||||
driverGroupIdPrefix = s"$uniqueGroupId-driver")
|
||||
|
||||
new KafkaMicroBatchReader(
|
||||
kafkaOffsetReader,
|
||||
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
|
||||
options,
|
||||
metadataPath,
|
||||
startingStreamOffsets,
|
||||
failOnDataLoss(caseInsensitiveParams))
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader]] to read
|
||||
* Kafka data in a continuous streaming query.
|
||||
*/
|
||||
override def createContinuousReader(
|
||||
schema: Optional[StructType],
|
||||
metadataPath: String,
|
||||
|
@ -408,8 +455,27 @@ private[kafka010] object KafkaSourceProvider extends Logging {
|
|||
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
|
||||
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
|
||||
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
|
||||
|
||||
val TOPIC_OPTION_KEY = "topic"
|
||||
|
||||
val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
|
||||
"""
|
||||
|Some data may have been lost because they are not available in Kafka any more; either the
|
||||
| data was aged out by Kafka or the topic may have been deleted before all the data in the
|
||||
| topic was processed. If you want your streaming query to fail on such cases, set the source
|
||||
| option "failOnDataLoss" to "true".
|
||||
""".stripMargin
|
||||
|
||||
val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
|
||||
"""
|
||||
|Some data may have been lost because they are not available in Kafka any more; either the
|
||||
| data was aged out by Kafka or the topic may have been deleted before all the data in the
|
||||
| topic was processed. If you don't want your streaming query to fail on such cases, set the
|
||||
| source option "failOnDataLoss" to "false".
|
||||
""".stripMargin
|
||||
|
||||
|
||||
|
||||
private val deserClassName = classOf[ByteArrayDeserializer].getName
|
||||
|
||||
def getKafkaOffsetRangeLimit(
|
||||
|
|
2
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
vendored
Normal file
2
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
0v99999
|
||||
{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
|
|
@ -1 +1 @@
|
|||
2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
|
||||
2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
|
|||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.io.Source
|
||||
import scala.util.Random
|
||||
|
||||
import org.apache.kafka.clients.producer.RecordMetadata
|
||||
|
@ -42,7 +43,6 @@ import org.apache.spark.sql.kafka010.KafkaSourceProvider._
|
|||
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
|
||||
import org.apache.spark.sql.streaming.util.StreamManualClock
|
||||
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
|
||||
|
||||
|
@ -112,14 +112,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
|
|||
query.nonEmpty,
|
||||
"Cannot add data when there is no query for finding the active kafka source")
|
||||
|
||||
val sources = query.get.logicalPlan.collect {
|
||||
case StreamingExecutionRelation(source: KafkaSource, _) => source
|
||||
} ++ (query.get.lastExecution match {
|
||||
case null => Seq()
|
||||
case e => e.logical.collect {
|
||||
case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
|
||||
}
|
||||
})
|
||||
val sources = {
|
||||
query.get.logicalPlan.collect {
|
||||
case StreamingExecutionRelation(source: KafkaSource, _) => source
|
||||
case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source
|
||||
} ++ (query.get.lastExecution match {
|
||||
case null => Seq()
|
||||
case e => e.logical.collect {
|
||||
case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
|
||||
}
|
||||
})
|
||||
}.distinct
|
||||
|
||||
if (sources.isEmpty) {
|
||||
throw new Exception(
|
||||
"Could not find Kafka source in the StreamExecution logical plan to add data to")
|
||||
|
@ -155,7 +159,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
|
|||
protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
|
||||
}
|
||||
|
||||
class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
|
||||
abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
|
||||
|
||||
import testImplicits._
|
||||
|
||||
|
@ -303,94 +307,105 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
|
|||
)
|
||||
}
|
||||
|
||||
testWithUninterruptibleThread(
|
||||
"deserialization of initial offset with Spark 2.1.0") {
|
||||
test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
|
||||
withTempDir { metadataPath =>
|
||||
val topic = newTopic
|
||||
testUtils.createTopic(topic, partitions = 3)
|
||||
val topic = "kafka-initial-offset-current"
|
||||
testUtils.createTopic(topic, partitions = 1)
|
||||
|
||||
val provider = new KafkaSourceProvider
|
||||
val parameters = Map(
|
||||
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
|
||||
"subscribe" -> topic
|
||||
)
|
||||
val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
|
||||
"", parameters)
|
||||
source.getOffset.get // Write initial offset
|
||||
val initialOffsetFile = Paths.get(s"${metadataPath.getAbsolutePath}/sources/0/0").toFile
|
||||
|
||||
// Make sure Spark 2.1.0 will throw an exception when reading the new log
|
||||
intercept[java.lang.IllegalArgumentException] {
|
||||
// Simulate how Spark 2.1.0 reads the log
|
||||
Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
|
||||
val length = in.read()
|
||||
val bytes = new Array[Byte](length)
|
||||
in.read(bytes)
|
||||
KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
|
||||
}
|
||||
val df = spark
|
||||
.readStream
|
||||
.format("kafka")
|
||||
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
|
||||
.option("subscribe", topic)
|
||||
.option("startingOffsets", s"earliest")
|
||||
.load()
|
||||
|
||||
// Test the written initial offset file has 0 byte in the beginning, so that
|
||||
// Spark 2.1.0 can read the offsets (see SPARK-19517)
|
||||
testStream(df)(
|
||||
StartStream(checkpointLocation = metadataPath.getAbsolutePath),
|
||||
makeSureGetOffsetCalled)
|
||||
|
||||
val binarySource = Source.fromFile(initialOffsetFile)
|
||||
try {
|
||||
assert(binarySource.next().toInt == 0) // first byte is binary 0
|
||||
} finally {
|
||||
binarySource.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") {
|
||||
test("deserialization of initial offset written by Spark 2.1.0 (SPARK-19517)") {
|
||||
withTempDir { metadataPath =>
|
||||
val topic = "kafka-initial-offset-2-1-0"
|
||||
testUtils.createTopic(topic, partitions = 3)
|
||||
testUtils.sendMessages(topic, Array("0", "1", "2"), Some(0))
|
||||
testUtils.sendMessages(topic, Array("0", "10", "20"), Some(1))
|
||||
testUtils.sendMessages(topic, Array("0", "100", "200"), Some(2))
|
||||
|
||||
val provider = new KafkaSourceProvider
|
||||
val parameters = Map(
|
||||
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
|
||||
"subscribe" -> topic
|
||||
)
|
||||
|
||||
// Copy the initial offset file into the right location inside the checkpoint root directory
|
||||
// such that the Kafka source can read it for initial offsets.
|
||||
val from = new File(
|
||||
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath
|
||||
val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
|
||||
val to = Paths.get(s"${metadataPath.getAbsolutePath}/sources/0/0")
|
||||
Files.createDirectories(to.getParent)
|
||||
Files.copy(from, to)
|
||||
|
||||
val source = provider.createSource(
|
||||
spark.sqlContext, metadataPath.toURI.toString, None, "", parameters)
|
||||
val deserializedOffset = source.getOffset.get
|
||||
val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
|
||||
assert(referenceOffset == deserializedOffset)
|
||||
val df = spark
|
||||
.readStream
|
||||
.format("kafka")
|
||||
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
|
||||
.option("subscribe", topic)
|
||||
.option("startingOffsets", s"earliest")
|
||||
.load()
|
||||
.selectExpr("CAST(value AS STRING)")
|
||||
.as[String]
|
||||
.map(_.toInt)
|
||||
|
||||
// Test that the query starts from the expected initial offset (i.e. read older offsets,
|
||||
// even though startingOffsets is latest).
|
||||
testStream(df)(
|
||||
StartStream(checkpointLocation = metadataPath.getAbsolutePath),
|
||||
AddKafkaData(Set(topic), 1000),
|
||||
CheckAnswer(0, 1, 2, 10, 20, 200, 1000))
|
||||
}
|
||||
}
|
||||
|
||||
testWithUninterruptibleThread("deserialization of initial offset written by future version") {
|
||||
test("deserialization of initial offset written by future version") {
|
||||
withTempDir { metadataPath =>
|
||||
val futureMetadataLog =
|
||||
new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession,
|
||||
metadataPath.getAbsolutePath) {
|
||||
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
|
||||
out.write(0)
|
||||
val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
|
||||
writer.write(s"v99999\n${metadata.json}")
|
||||
writer.flush
|
||||
}
|
||||
}
|
||||
|
||||
val topic = newTopic
|
||||
val topic = "kafka-initial-offset-future-version"
|
||||
testUtils.createTopic(topic, partitions = 3)
|
||||
val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
|
||||
futureMetadataLog.add(0, offset)
|
||||
|
||||
val provider = new KafkaSourceProvider
|
||||
val parameters = Map(
|
||||
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
|
||||
"subscribe" -> topic
|
||||
)
|
||||
val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
|
||||
"", parameters)
|
||||
// Copy the initial offset file into the right location inside the checkpoint root directory
|
||||
// such that the Kafka source can read it for initial offsets.
|
||||
val from = new File(
|
||||
getClass.getResource("/kafka-source-initial-offset-future-version.bin").toURI).toPath
|
||||
val to = Paths.get(s"${metadataPath.getAbsolutePath}/sources/0/0")
|
||||
Files.createDirectories(to.getParent)
|
||||
Files.copy(from, to)
|
||||
|
||||
val e = intercept[java.lang.IllegalStateException] {
|
||||
source.getOffset.get // Read initial offset
|
||||
}
|
||||
val df = spark
|
||||
.readStream
|
||||
.format("kafka")
|
||||
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
|
||||
.option("subscribe", topic)
|
||||
.load()
|
||||
.selectExpr("CAST(value AS STRING)")
|
||||
.as[String]
|
||||
.map(_.toInt)
|
||||
|
||||
Seq(
|
||||
s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999",
|
||||
"produced by a newer version of Spark and cannot be read by this version"
|
||||
).foreach { message =>
|
||||
assert(e.getMessage.contains(message))
|
||||
}
|
||||
testStream(df)(
|
||||
StartStream(checkpointLocation = metadataPath.getAbsolutePath),
|
||||
ExpectFailure[IllegalStateException](e => {
|
||||
Seq(
|
||||
s"maximum supported log version is v1, but encountered v99999",
|
||||
"produced by a newer version of Spark and cannot be read by this version"
|
||||
).foreach { message =>
|
||||
assert(e.toString.contains(message))
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -542,6 +557,91 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
|
|||
CheckLastBatch(120 to 124: _*)
|
||||
)
|
||||
}
|
||||
|
||||
test("ensure stream-stream self-join generates only one offset in offset log") {
|
||||
val topic = newTopic()
|
||||
testUtils.createTopic(topic, partitions = 2)
|
||||
require(testUtils.getLatestOffsets(Set(topic)).size === 2)
|
||||
|
||||
val kafka = spark
|
||||
.readStream
|
||||
.format("kafka")
|
||||
.option("subscribe", topic)
|
||||
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
|
||||
.option("kafka.metadata.max.age.ms", "1")
|
||||
.load()
|
||||
|
||||
val values = kafka
|
||||
.selectExpr("CAST(CAST(value AS STRING) AS INT) AS value",
|
||||
"CAST(CAST(value AS STRING) AS INT) % 5 AS key")
|
||||
|
||||
val join = values.join(values, "key")
|
||||
|
||||
testStream(join)(
|
||||
makeSureGetOffsetCalled,
|
||||
AddKafkaData(Set(topic), 1, 2),
|
||||
CheckAnswer((1, 1, 1), (2, 2, 2)),
|
||||
AddKafkaData(Set(topic), 6, 3),
|
||||
CheckAnswer((1, 1, 1), (2, 2, 2), (3, 3, 3), (1, 6, 1), (1, 1, 6), (1, 6, 6))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase {
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
spark.conf.set(
|
||||
"spark.sql.streaming.disabledV2MicroBatchReaders",
|
||||
classOf[KafkaSourceProvider].getCanonicalName)
|
||||
}
|
||||
|
||||
test("V1 Source is used when disabled through SQLConf") {
|
||||
val topic = newTopic()
|
||||
testUtils.createTopic(topic, partitions = 5)
|
||||
|
||||
val kafka = spark
|
||||
.readStream
|
||||
.format("kafka")
|
||||
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
|
||||
.option("kafka.metadata.max.age.ms", "1")
|
||||
.option("subscribePattern", s"$topic.*")
|
||||
.load()
|
||||
|
||||
testStream(kafka)(
|
||||
makeSureGetOffsetCalled,
|
||||
AssertOnQuery { query =>
|
||||
query.logicalPlan.collect {
|
||||
case StreamingExecutionRelation(_: KafkaSource, _) => true
|
||||
}.nonEmpty
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
|
||||
|
||||
test("V2 Source is used by default") {
|
||||
val topic = newTopic()
|
||||
testUtils.createTopic(topic, partitions = 5)
|
||||
|
||||
val kafka = spark
|
||||
.readStream
|
||||
.format("kafka")
|
||||
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
|
||||
.option("kafka.metadata.max.age.ms", "1")
|
||||
.option("subscribePattern", s"$topic.*")
|
||||
.load()
|
||||
|
||||
testStream(kafka)(
|
||||
makeSureGetOffsetCalled,
|
||||
AssertOnQuery { query =>
|
||||
query.logicalPlan.collect {
|
||||
case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true
|
||||
}.nonEmpty
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
|
|
@ -1146,10 +1146,20 @@ object SQLConf {
|
|||
val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
|
||||
.internal()
|
||||
.doc("A comma-separated list of fully qualified data source register class names for which" +
|
||||
" StreamWriteSupport is disabled. Writes to these sources will fail back to the V1 Sink.")
|
||||
" StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.")
|
||||
.stringConf
|
||||
.createWithDefault("")
|
||||
|
||||
val DISABLED_V2_STREAMING_MICROBATCH_READERS =
|
||||
buildConf("spark.sql.streaming.disabledV2MicroBatchReaders")
|
||||
.internal()
|
||||
.doc(
|
||||
"A comma-separated list of fully qualified data source register class names for which " +
|
||||
"MicroBatchReadSupport is disabled. Reads from these sources will fall back to the " +
|
||||
"V1 Sources.")
|
||||
.stringConf
|
||||
.createWithDefault("")
|
||||
|
||||
object PartitionOverwriteMode extends Enumeration {
|
||||
val STATIC, DYNAMIC = Value
|
||||
}
|
||||
|
@ -1525,6 +1535,9 @@ class SQLConf extends Serializable with Logging {
|
|||
|
||||
def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)
|
||||
|
||||
def disabledV2StreamingMicroBatchReaders: String =
|
||||
getConf(DISABLED_V2_STREAMING_MICROBATCH_READERS)
|
||||
|
||||
def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
|
||||
|
||||
def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)
|
||||
|
|
|
@ -72,27 +72,36 @@ class MicroBatchExecution(
|
|||
// Note that we have to use the previous `output` as attributes in StreamingExecutionRelation,
|
||||
// since the existing logical plan has already used those attributes. The per-microbatch
|
||||
// transformation is responsible for replacing attributes with their final values.
|
||||
|
||||
val disabledSources =
|
||||
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")
|
||||
|
||||
val _logicalPlan = analyzedPlan.transform {
|
||||
case streamingRelation@StreamingRelation(dataSource, _, output) =>
|
||||
case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) =>
|
||||
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
|
||||
// Materialize source to avoid creating it in every batch
|
||||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
|
||||
val source = dataSource.createSource(metadataPath)
|
||||
val source = dataSourceV1.createSource(metadataPath)
|
||||
nextSourceId += 1
|
||||
logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]")
|
||||
StreamingExecutionRelation(source, output)(sparkSession)
|
||||
})
|
||||
case s @ StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) =>
|
||||
case s @ StreamingRelationV2(
|
||||
dataSourceV2: MicroBatchReadSupport, sourceName, options, output, _) if
|
||||
!disabledSources.contains(dataSourceV2.getClass.getCanonicalName) =>
|
||||
v2ToExecutionRelationMap.getOrElseUpdate(s, {
|
||||
// Materialize source to avoid creating it in every batch
|
||||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
|
||||
val reader = source.createMicroBatchReader(
|
||||
val reader = dataSourceV2.createMicroBatchReader(
|
||||
Optional.empty(), // user specified schema
|
||||
metadataPath,
|
||||
new DataSourceOptions(options.asJava))
|
||||
nextSourceId += 1
|
||||
logInfo(s"Using MicroBatchReader [$reader] from " +
|
||||
s"DataSourceV2 named '$sourceName' [$dataSourceV2]")
|
||||
StreamingExecutionRelation(reader, output)(sparkSession)
|
||||
})
|
||||
case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
|
||||
case s @ StreamingRelationV2(dataSourceV2, sourceName, _, output, v1Relation) =>
|
||||
v2ToExecutionRelationMap.getOrElseUpdate(s, {
|
||||
// Materialize source to avoid creating it in every batch
|
||||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
|
||||
|
@ -102,6 +111,7 @@ class MicroBatchExecution(
|
|||
}
|
||||
val source = v1Relation.get.dataSource.createSource(metadataPath)
|
||||
nextSourceId += 1
|
||||
logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dataSourceV2]")
|
||||
StreamingExecutionRelation(source, output)(sparkSession)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue