[SPARK-35095][SS][TESTS] Use ANSI intervals in streaming join tests

### What changes were proposed in this pull request?

This PR extends the following tests to use day-time intervals.

* StreamingOuterJoinSuite.right outer with watermark range condition
* StreamingOuterJoinSuite.left outer with watermark range condition

### Why are the changes needed?

Currently, there are no tests to use day-time intervals.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New assertions.

Closes #32953 from sarutak/stream-join-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
Kousuke Saruta 2021-06-17 22:48:18 +03:00 committed by Max Gekk
parent 7fcb127674
commit 45b7f76295

View file

@ -137,7 +137,11 @@ abstract class StreamingJoinSuite
(leftInput, rightInput, select)
}
protected def setupJoinWithRangeCondition(joinType: String)
protected def setupJoinWithRangeCondition(
joinType: String,
watermark: String = "10 seconds",
lowerBound: String = "interval 5 seconds",
upperBound: String = "interval 5 seconds")
: (MemoryStream[(Int, Int)], MemoryStream[(Int, Int)], DataFrame) = {
val leftInput = MemoryStream[(Int, Int)]
@ -145,18 +149,18 @@ abstract class StreamingJoinSuite
val df1 = leftInput.toDF.toDF("leftKey", "time")
.select('leftKey, timestamp_seconds($"time") as "leftTime", ('leftKey * 2) as "leftValue")
.withWatermark("leftTime", "10 seconds")
.withWatermark("leftTime", watermark)
val df2 = rightInput.toDF.toDF("rightKey", "time")
.select('rightKey, timestamp_seconds($"time") as "rightTime",
('rightKey * 3) as "rightValue")
.withWatermark("rightTime", "10 seconds")
.withWatermark("rightTime", watermark)
val joined =
df1.join(
df2,
expr("leftKey = rightKey AND " +
"leftTime BETWEEN rightTime - interval 5 seconds AND rightTime + interval 5 seconds"),
s"leftTime BETWEEN rightTime - $lowerBound AND rightTime + $upperBound"),
joinType)
val select = if (joinType == "left_semi") {
@ -777,6 +781,35 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite {
CheckNewAnswer(outerResult),
assertNumStateRows(total = 2, updated = 1)
)
Seq(
("10 minutes",
"interval 3 minutes 30 seconds"),
("10 minutes",
"interval '3:30' minute to second")).foreach { case (watermark, bound) =>
val (leftInput2, rightInput2, joined2) =
setupJoinWithRangeCondition(
joinType,
watermark,
bound,
bound)
testStream(joined2)(
AddData(leftInput2, (1, 210), (3, 5)),
CheckAnswer(),
AddData(rightInput2, (1, 300), (2, 5)),
CheckNewAnswer((1, 1, 210, 300)),
AddData(rightInput2, (1, 450)),
CheckNewAnswer(),
assertNumStateRows(total = 5, updated = 5),
AddData(leftInput2, (1, 260), (1, 1800)),
CheckNewAnswer((1, 1, 260, 300), (1, 1, 260, 450)),
assertNumStateRows(total = 7, updated = 2),
AddData(rightInput2, (0, 1800)),
CheckNewAnswer(outerResult),
assertNumStateRows(total = 2, updated = 1)
)
}
}
}