From 3c4e61918fc8266368bd33ea0612144de77571e6 Mon Sep 17 00:00:00 2001 From: bettermouse Date: Tue, 21 Jan 2020 21:37:21 -0800 Subject: [PATCH] [SPARK-30553][DOCS] fix structured-streaming java example error # What changes were proposed in this pull request? Fix structured-streaming java example error. ```java Dataset windowedCounts = words .withWatermark("timestamp", "10 minutes") .groupBy( functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), words.col("word")) .count(); ``` It does not clean up old state.May cause OOM > Before the fix ```scala == Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter48e331f0 +- *(4) HashAggregate(keys=[window#13, word#4], functions=[count(1)], output=[window#13, word#4, count#12L]) +- StateStoreSave [window#13, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state, runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6, numPartitions = 1], Update, 1579530890886, 2 +- *(3) HashAggregate(keys=[window#13, word#4], functions=[merge_count(1)], output=[window#13, word#4, count#23L]) +- StateStoreRestore [window#13, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state, runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6, numPartitions = 1], 2 +- *(2) HashAggregate(keys=[window#13, word#4], functions=[merge_count(1)], output=[window#13, word#4, count#23L]) +- Exchange hashpartitioning(window#13, word#4, 1) +- *(1) HashAggregate(keys=[window#13, word#4], functions=[partial_count(1)], output=[window#13, word#4, count#23L]) +- *(1) Project [window#13, word#4] +- *(1) Filter (((isnotnull(timestamp#5) && isnotnull(window#13)) && (timestamp#5 >= window#13.start)) && (timestamp#5 < window#13.end)) +- *(1) Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) + 600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms), List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) + 600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms)], [window#13, word#4, timestamp#5-T600000ms] +- EventTimeWatermark timestamp#5: timestamp, interval 10 minutes +- LocalTableScan , [word#4, timestamp#5] ``` > After the fix ```scala == Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter1df12a96 +- *(4) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[count(1)], output=[window#8-T600000ms, word#4, count#12L]) +- StateStoreSave [window#13-T600000ms, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state, runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4, numPartitions = 1], Update, 1579529975342, 2 +- *(3) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[merge_count(1)], output=[window#13-T600000ms, word#4, count#23L]) +- StateStoreRestore [window#13-T600000ms, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state, runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4, numPartitions = 1], 2 +- *(2) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[merge_count(1)], output=[window#13-T600000ms, word#4, count#23L]) +- Exchange hashpartitioning(window#13-T600000ms, word#4, 1) +- *(1) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[partial_count(1)], output=[window#13-T600000ms, word#4, count#23L]) +- *(1) Project [window#13-T600000ms, word#4] +- *(1) Filter (((isnotnull(timestamp#5-T600000ms) && isnotnull(window#13-T600000ms)) && (timestamp#5-T600000ms >= window#13-T600000ms.start)) && (timestamp#5-T600000ms < window#13-T600000ms.end)) +- *(1) Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) + 600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms), List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) + 600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms)], [window#13-T600000ms, word#4, timestamp#5-T600000ms] +- EventTimeWatermark timestamp#5: timestamp, interval 10 minutes +- LocalTableScan , [word#4, timestamp#5] ``` ### Why are the changes needed? If we write the code according to the documentation.It does not clean up old state.May cause OOM ### Does this PR introduce any user-facing change? No ### How was this patch tested? ```java SparkSession spark = SparkSession.builder().appName("test").master("local[*]") .config("spark.sql.shuffle.partitions", 1) .getOrCreate(); Dataset lines = spark.readStream().format("socket") .option("host", "skynet") .option("includeTimestamp", true) .option("port", 8888).load(); Dataset words = lines.toDF("word", "timestamp"); Dataset windowedCounts = words .withWatermark("timestamp", "10 minutes") .groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word")) .count(); StreamingQuery start = windowedCounts.writeStream() .outputMode("update") .format("console").start(); start.awaitTermination(); ``` We can write an example like this.And input some date 1. see the matrics `stateOnCurrentVersionSizeBytes` in log.Is it increasing all the time? 2. see the Physical Plan.Whether it contains things like `HashAggregate(keys=[window#11-T10000ms, value#39]` 3. We can debug in `storeManager.remove(store, keyRow)`.Whether it will remove the old state. Closes #27268 from bettermouse/spark-30553. Authored-by: bettermouse Signed-off-by: Dongjoon Hyun --- docs/structured-streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 429d456889..727bda18a1 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -982,8 +982,8 @@ Dataset words = ... // streaming DataFrame of schema { timestamp: Timestamp Dataset windowedCounts = words .withWatermark("timestamp", "10 minutes") .groupBy( - functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), - words.col("word")) + window(col("timestamp"), "10 minutes", "5 minutes"), + col("word")) .count(); {% endhighlight %}