[SPARK-23539][SS][FOLLOWUP][TESTS] Add UT to ensure existing query doesn't break with default conf of includeHeaders

### What changes were proposed in this pull request?

This patch adds new UT to ensure existing query (before Spark 3.0.0) with checkpoint doesn't break with default configuration of "includeHeaders" being introduced via SPARK-23539.

This patch also modifies existing test which checks type of columns to also check headers column as well.

### Why are the changes needed?

The patch adds missing tests which guarantees backward compatibility of the change of SPARK-23539.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

UT passed.

Closes #25792 from HeartSaVioR/SPARK-23539-FOLLOWUP.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-09-16 15:22:04 -05:00 committed by Sean Owen
parent 5881871ca5
commit 88c8d5eed2
10 changed files with 84 additions and 2 deletions

View file

@ -0,0 +1 @@
{"id":"fc415a71-f0a2-4c3c-aeaf-f9e258c3f726"}

View file

@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1568508285207,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}}
{"spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62":{"2":3,"4":3,"1":3,"3":3,"0":3}}

View file

@ -28,12 +28,13 @@ import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
@ -47,6 +48,7 @@ import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
abstract class KafkaSourceTest extends StreamTest with SharedSparkSession with KafkaTest {
@ -1162,6 +1164,62 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
}
test("default config of includeHeader doesn't break existing query from Spark 2.4") {
import testImplicits._
// This topic name is migrated from Spark 2.4.3 test run
val topic = "spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62"
// create same topic and messages as test run
testUtils.createTopic(topic, partitions = 5, overwrite = true)
testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
(31 to 35).map { num =>
(num - 31, (num.toString, Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))))
}.foreach { rec => testUtils.sendMessage(topic, rec._2, Some(rec._1)) }
val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("subscribePattern", topic)
.option("startingOffsets", "earliest")
.load()
val query = kafka.dropDuplicates()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.map(kv => kv._2.toInt + 1)
val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/").toURI
val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
testStream(query)(
StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
/*
Note: The checkpoint was generated using the following input in Spark version 2.4.3
testUtils.createTopic(topic, partitions = 5, overwrite = true)
testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
*/
makeSureGetOffsetCalled,
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
}
abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
@ -1414,7 +1472,9 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
val now = System.currentTimeMillis()
val topic = newTopic()
testUtils.createTopic(newTopic(), partitions = 1)
testUtils.sendMessages(topic, Array(1).map(_.toString))
testUtils.sendMessage(
topic, ("1", Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))), None
)
val kafka = spark
.readStream
@ -1423,6 +1483,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
.option("kafka.metadata.max.age.ms", "1")
.option("startingOffsets", s"earliest")
.option("subscribe", topic)
.option("includeHeaders", "true")
.load()
val query = kafka
@ -1445,6 +1506,21 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
// producer. So here we just use a low bound to make sure the internal conversion works.
assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row")
assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")
def checkHeader(row: Row, expected: Seq[(String, Array[Byte])]): Unit = {
// array<struct<key:string,value:binary>>
val headers = row.getList[Row](row.fieldIndex("headers")).asScala
assert(headers.length === expected.length)
(0 until expected.length).foreach { idx =>
val key = headers(idx).getAs[String]("key")
val value = headers(idx).getAs[Array[Byte]]("value")
assert(key === expected(idx)._1)
assert(value === expected(idx)._2)
}
}
checkHeader(row, Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8))))
query.stop()
}