diff --git a/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/commits/0 b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/commits/0 new file mode 100644 index 0000000000..9c1e3021c3 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/commits/0 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/metadata b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/metadata new file mode 100644 index 0000000000..f1b5ab7aa1 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/metadata @@ -0,0 +1 @@ +{"id":"fc415a71-f0a2-4c3c-aeaf-f9e258c3f726"} \ No newline at end of file diff --git a/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/offsets/0 b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/offsets/0 new file mode 100644 index 0000000000..5dbadea57a --- /dev/null +++ b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1568508285207,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}} +{"spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62":{"2":3,"4":3,"1":3,"3":3,"0":3}} \ No newline at end of file diff --git a/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/sources/0/0 b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/sources/0/0 new file mode 100644 index 0000000000..8cf9f8e009 Binary files /dev/null and b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/sources/0/0 differ diff --git a/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/0/1.delta b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/0/1.delta new file mode 100644 index 0000000000..5815bbdcc2 Binary files /dev/null and b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/0/1.delta differ diff --git a/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/1/1.delta b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/1/1.delta new file mode 100644 index 0000000000..e1a065b2b1 Binary files /dev/null and b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/1/1.delta differ diff --git a/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/2/1.delta b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/2/1.delta new file mode 100644 index 0000000000..cce14294e0 Binary files /dev/null and b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/2/1.delta differ diff --git a/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/3/1.delta b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/3/1.delta new file mode 100644 index 0000000000..5706301950 Binary files /dev/null and b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/3/1.delta differ diff --git a/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/4/1.delta b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/4/1.delta new file mode 100644 index 0000000000..e8b1e4bdc8 Binary files /dev/null and b/external/kafka-0-10-sql/src/test/resources/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/state/0/4/1.delta differ diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 9fe100431c..609cf3ce4b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -28,12 +28,13 @@ import scala.collection.JavaConverters._ import scala.io.Source import scala.util.Random +import org.apache.commons.io.FileUtils import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ -import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession} +import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.read.streaming.SparkDataStream import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation @@ -47,6 +48,7 @@ import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils abstract class KafkaSourceTest extends StreamTest with SharedSparkSession with KafkaTest { @@ -1162,6 +1164,62 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) } } + test("default config of includeHeader doesn't break existing query from Spark 2.4") { + import testImplicits._ + + // This topic name is migrated from Spark 2.4.3 test run + val topic = "spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62" + // create same topic and messages as test run + testUtils.createTopic(topic, partitions = 5, overwrite = true) + testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0)) + testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1)) + testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2)) + testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3)) + testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4)) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + (31 to 35).map { num => + (num - 31, (num.toString, Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8))))) + }.foreach { rec => testUtils.sendMessage(topic, rec._2, Some(rec._1)) } + + val kafka = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribePattern", topic) + .option("startingOffsets", "earliest") + .load() + + val query = kafka.dropDuplicates() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + .map(kv => kv._2.toInt + 1) + + val resourceUri = this.getClass.getResource( + "/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/").toURI + + val checkpointDir = Utils.createTempDir().getCanonicalFile + // Copy the checkpoint to a temp dir to prevent changes to the original. + // Not doing this will lead to the test passing on the first run, but fail subsequent runs. + FileUtils.copyDirectory(new File(resourceUri), checkpointDir) + + testStream(query)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + /* + Note: The checkpoint was generated using the following input in Spark version 2.4.3 + testUtils.createTopic(topic, partitions = 5, overwrite = true) + + testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0)) + testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1)) + testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2)) + testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3)) + testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4)) + */ + makeSureGetOffsetCalled, + CheckNewAnswer(32, 33, 34, 35, 36) + ) + } } abstract class KafkaSourceSuiteBase extends KafkaSourceTest { @@ -1414,7 +1472,9 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { val now = System.currentTimeMillis() val topic = newTopic() testUtils.createTopic(newTopic(), partitions = 1) - testUtils.sendMessages(topic, Array(1).map(_.toString)) + testUtils.sendMessage( + topic, ("1", Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))), None + ) val kafka = spark .readStream @@ -1423,6 +1483,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { .option("kafka.metadata.max.age.ms", "1") .option("startingOffsets", s"earliest") .option("subscribe", topic) + .option("includeHeaders", "true") .load() val query = kafka @@ -1445,6 +1506,21 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { // producer. So here we just use a low bound to make sure the internal conversion works. assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row") assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row") + + def checkHeader(row: Row, expected: Seq[(String, Array[Byte])]): Unit = { + // array> + val headers = row.getList[Row](row.fieldIndex("headers")).asScala + assert(headers.length === expected.length) + + (0 until expected.length).foreach { idx => + val key = headers(idx).getAs[String]("key") + val value = headers(idx).getAs[Array[Byte]]("value") + assert(key === expected(idx)._1) + assert(value === expected(idx)._2) + } + } + + checkHeader(row, Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))) query.stop() }