[SPARK-19517][SS] KafkaSource fails to initialize partition offsets
## What changes were proposed in this pull request? This patch fixes a bug in `KafkaSource` with the (de)serialization of the length of the JSON string that contains the initial partition offsets. ## How was this patch tested? I ran the test suite for spark-sql-kafka-0-10. Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com> Closes #16857 from vitillo/kafka_source_fix.
This commit is contained in:
parent
4cc06f4eb1
commit
1a3f5f8c55
|
@ -105,3 +105,4 @@ org.apache.spark.scheduler.ExternalClusterManager
|
||||||
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
|
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
|
||||||
spark-warehouse
|
spark-warehouse
|
||||||
structured-streaming/*
|
structured-streaming/*
|
||||||
|
kafka-source-initial-offset-version-2.1.0.bin
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.{util => ju}
|
||||||
import java.io._
|
import java.io._
|
||||||
import java.nio.charset.StandardCharsets
|
import java.nio.charset.StandardCharsets
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
|
|
||||||
import org.apache.spark.SparkContext
|
import org.apache.spark.SparkContext
|
||||||
|
@ -97,16 +98,31 @@ private[kafka010] class KafkaSource(
|
||||||
val metadataLog =
|
val metadataLog =
|
||||||
new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
|
new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
|
||||||
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
|
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
|
||||||
val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
|
out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
|
||||||
out.write(bytes.length)
|
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
|
||||||
out.write(bytes)
|
writer.write(VERSION)
|
||||||
|
writer.write(metadata.json)
|
||||||
|
writer.flush
|
||||||
}
|
}
|
||||||
|
|
||||||
override def deserialize(in: InputStream): KafkaSourceOffset = {
|
override def deserialize(in: InputStream): KafkaSourceOffset = {
|
||||||
val length = in.read()
|
in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
|
||||||
val bytes = new Array[Byte](length)
|
val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
|
||||||
in.read(bytes)
|
// HDFSMetadataLog guarantees that it never creates a partial file.
|
||||||
KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8)))
|
assert(content.length != 0)
|
||||||
|
if (content(0) == 'v') {
|
||||||
|
if (content.startsWith(VERSION)) {
|
||||||
|
KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length)))
|
||||||
|
} else {
|
||||||
|
val versionInFile = content.substring(0, content.indexOf("\n"))
|
||||||
|
throw new IllegalStateException(
|
||||||
|
s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " +
|
||||||
|
s"but was $versionInFile. Please upgrade your Spark.")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// The log was generated by Spark 2.1.0
|
||||||
|
KafkaSourceOffset(SerializedOffset(content))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,6 +351,8 @@ private[kafka010] object KafkaSource {
|
||||||
| source option "failOnDataLoss" to "false".
|
| source option "failOnDataLoss" to "false".
|
||||||
""".stripMargin
|
""".stripMargin
|
||||||
|
|
||||||
|
private val VERSION = "v1\n"
|
||||||
|
|
||||||
def getSortedExecutorList(sc: SparkContext): Array[String] = {
|
def getSortedExecutorList(sc: SparkContext): Array[String] = {
|
||||||
val bm = sc.env.blockManager
|
val bm = sc.env.blockManager
|
||||||
bm.master.getPeers(bm.blockManagerId).toArray
|
bm.master.getPeers(bm.blockManagerId).toArray
|
||||||
|
|
1
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
vendored
Normal file
1
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
vendored
Normal file
|
@ -0,0 +1 @@
|
||||||
|
2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
|
|
@ -17,7 +17,9 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.kafka010
|
package org.apache.spark.sql.kafka010
|
||||||
|
|
||||||
|
import java.io._
|
||||||
import java.nio.charset.StandardCharsets.UTF_8
|
import java.nio.charset.StandardCharsets.UTF_8
|
||||||
|
import java.nio.file.{Files, Paths}
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
@ -141,6 +143,108 @@ class KafkaSourceSuite extends KafkaSourceTest {
|
||||||
|
|
||||||
private val topicId = new AtomicInteger(0)
|
private val topicId = new AtomicInteger(0)
|
||||||
|
|
||||||
|
testWithUninterruptibleThread(
|
||||||
|
"deserialization of initial offset with Spark 2.1.0") {
|
||||||
|
withTempDir { metadataPath =>
|
||||||
|
val topic = newTopic
|
||||||
|
testUtils.createTopic(topic, partitions = 3)
|
||||||
|
|
||||||
|
val provider = new KafkaSourceProvider
|
||||||
|
val parameters = Map(
|
||||||
|
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
|
||||||
|
"subscribe" -> topic
|
||||||
|
)
|
||||||
|
val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
|
||||||
|
"", parameters)
|
||||||
|
source.getOffset.get // Write initial offset
|
||||||
|
|
||||||
|
// Make sure Spark 2.1.0 will throw an exception when reading the new log
|
||||||
|
intercept[java.lang.IllegalArgumentException] {
|
||||||
|
// Simulate how Spark 2.1.0 reads the log
|
||||||
|
val in = new FileInputStream(metadataPath.getAbsolutePath + "/0")
|
||||||
|
val length = in.read()
|
||||||
|
val bytes = new Array[Byte](length)
|
||||||
|
in.read(bytes)
|
||||||
|
KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") {
|
||||||
|
withTempDir { metadataPath =>
|
||||||
|
val topic = "kafka-initial-offset-2-1-0"
|
||||||
|
testUtils.createTopic(topic, partitions = 3)
|
||||||
|
|
||||||
|
val provider = new KafkaSourceProvider
|
||||||
|
val parameters = Map(
|
||||||
|
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
|
||||||
|
"subscribe" -> topic
|
||||||
|
)
|
||||||
|
|
||||||
|
val from = Paths.get(
|
||||||
|
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath)
|
||||||
|
val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
|
||||||
|
Files.copy(from, to)
|
||||||
|
|
||||||
|
val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
|
||||||
|
"", parameters)
|
||||||
|
val deserializedOffset = source.getOffset.get
|
||||||
|
val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
|
||||||
|
assert(referenceOffset == deserializedOffset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testWithUninterruptibleThread("deserialization of initial offset written by future version") {
|
||||||
|
withTempDir { metadataPath =>
|
||||||
|
val futureMetadataLog =
|
||||||
|
new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession,
|
||||||
|
metadataPath.getAbsolutePath) {
|
||||||
|
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
|
||||||
|
out.write(0)
|
||||||
|
val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
|
||||||
|
writer.write(s"v0\n${metadata.json}")
|
||||||
|
writer.flush
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val topic = newTopic
|
||||||
|
testUtils.createTopic(topic, partitions = 3)
|
||||||
|
val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
|
||||||
|
futureMetadataLog.add(0, offset)
|
||||||
|
|
||||||
|
val provider = new KafkaSourceProvider
|
||||||
|
val parameters = Map(
|
||||||
|
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
|
||||||
|
"subscribe" -> topic
|
||||||
|
)
|
||||||
|
val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
|
||||||
|
"", parameters)
|
||||||
|
|
||||||
|
val e = intercept[java.lang.IllegalStateException] {
|
||||||
|
source.getOffset.get // Read initial offset
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(e.getMessage.contains("Please upgrade your Spark"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("(de)serialization of initial offsets") {
|
||||||
|
val topic = newTopic()
|
||||||
|
testUtils.createTopic(topic, partitions = 64)
|
||||||
|
|
||||||
|
val reader = spark
|
||||||
|
.readStream
|
||||||
|
.format("kafka")
|
||||||
|
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
|
||||||
|
.option("subscribe", topic)
|
||||||
|
|
||||||
|
testStream(reader.load)(
|
||||||
|
makeSureGetOffsetCalled,
|
||||||
|
StopStream,
|
||||||
|
StartStream(),
|
||||||
|
StopStream)
|
||||||
|
}
|
||||||
|
|
||||||
test("maxOffsetsPerTrigger") {
|
test("maxOffsetsPerTrigger") {
|
||||||
val topic = newTopic()
|
val topic = newTopic()
|
||||||
testUtils.createTopic(topic, partitions = 3)
|
testUtils.createTopic(topic, partitions = 3)
|
||||||
|
|
Loading…
Reference in a new issue