From 2c64b731ae6a976b0d75a95901db849b4a0e2393 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 12 Nov 2020 15:31:57 -0800 Subject: [PATCH] [SPARK-33259][SS] Disable streaming query with possible correctness issue by default ### What changes were proposed in this pull request? This patch proposes to disable the streaming query with possible correctness issue in chained stateful operators. The behavior can be controlled by a SQL config, so if users understand the risk and still want to run the query, they can disable the check. ### Why are the changes needed? The possible correctness in chained stateful operators in streaming query is not straightforward for users. From users perspective, it will be considered as a Spark bug. It is also possible the worse case, users are not aware of the correctness issue and use wrong results. A better approach should be to disable such queries and let users choose to run the query if they understand there is such risk, instead of implicitly running the query and let users to find out correctness issue by themselves and report this known to Spark community. ### Does this PR introduce _any_ user-facing change? Yes. Streaming query with possible correctness issue will be blocked to run, except for users explicitly disable the SQL config. ### How was this patch tested? Unit test. Closes #30210 from viirya/SPARK-33259. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun --- docs/ss-migration-guide.md | 6 +++- .../UnsupportedOperationChecker.scala | 19 ++++++++--- .../apache/spark/sql/internal/SQLConf.scala | 18 ++++++++++ .../analysis/UnsupportedOperationsSuite.scala | 34 +++++++++++++------ .../FlatMapGroupsWithStateSuite.scala | 4 ++- 5 files changed, 64 insertions(+), 17 deletions(-) diff --git a/docs/ss-migration-guide.md b/docs/ss-migration-guide.md index 002058b69b..d52b2e095f 100644 --- a/docs/ss-migration-guide.md +++ b/docs/ss-migration-guide.md @@ -26,10 +26,14 @@ Note that this migration guide describes the items specific to Structured Stream Many items of SQL migration can be applied when migrating Structured Streaming to higher versions. Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html). +## Upgrading from Structured Streaming 3.0 to 3.1 + +- In Spark 3.0 and before, for the queries that have stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded, Spark only prints a warning message. Since Spark 3.1, Spark will check for such queries with possible correctness issue and throw AnalysisException for it by default. For the users who understand the possible risk of correctness issue and still decide to run the query, please disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false. + ## Upgrading from Structured Streaming 2.4 to 3.0 - In Spark 3.0, Structured Streaming forces the source schema into nullable when file-based datasources such as text, json, csv, parquet and orc are used via `spark.readStream(...)`. Previously, it respected the nullability in source schema; however, it caused issues tricky to debug with NPE. To restore the previous behavior, set `spark.sql.streaming.fileSource.schema.forceNullable` to `false`. - Spark 3.0 fixes the correctness issue on Stream-stream outer join, which changes the schema of state. (See [SPARK-26154](https://issues.apache.org/jira/browse/SPARK-26154) for more details). If you start your query from checkpoint constructed from Spark 2.x which uses stream-stream outer join, Spark 3.0 fails the query. To recalculate outputs, discard the checkpoint and replay previous inputs. -- In Spark 3.0, the deprecated class `org.apache.spark.sql.streaming.ProcessingTime` has been removed. Use `org.apache.spark.sql.streaming.Trigger.ProcessingTime` instead. Likewise, `org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger` has been removed in favor of `Trigger.Continuous`, and `org.apache.spark.sql.execution.streaming.OneTimeTrigger` has been hidden in favor of `Trigger.Once`. \ No newline at end of file +- In Spark 3.0, the deprecated class `org.apache.spark.sql.streaming.ProcessingTime` has been removed. Use `org.apache.spark.sql.streaming.Trigger.ProcessingTime` instead. Likewise, `org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger` has been removed in favor of `Trigger.Continuous`, and `org.apache.spark.sql.execution.streaming.OneTimeTrigger` has been hidden in favor of `Trigger.Once`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 8093234556..814ea8c976 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode /** @@ -40,10 +41,15 @@ object UnsupportedOperationChecker extends Logging { } } + /** + * Checks for possible correctness issue in chained stateful operators. The behavior is + * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. + * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just + * print a warning message. + */ def checkStreamingQueryGlobalWatermarkLimit( plan: LogicalPlan, - outputMode: OutputMode, - failWhenDetected: Boolean): Unit = { + outputMode: OutputMode): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { case s: Aggregate if s.isStreaming && outputMode == InternalOutputModes.Append => true @@ -62,6 +68,8 @@ object UnsupportedOperationChecker extends Logging { case _ => false } + val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled + try { plan.foreach { subPlan => if (isStatefulOperation(subPlan)) { @@ -73,7 +81,10 @@ object UnsupportedOperationChecker extends Logging { "The query contains stateful operation which can emit rows older than " + "the current watermark plus allowed late record delay, which are \"late rows\"" + " in downstream stateful operations and these rows can be discarded. " + - "Please refer the programming guide doc for more details." + "Please refer the programming guide doc for more details. If you understand " + + "the possible risk of correctness issue and still need to run the query, " + + "you can disable this check by setting the config " + + "`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false." throwError(errorMsg)(plan) } } @@ -388,7 +399,7 @@ object UnsupportedOperationChecker extends Logging { checkUnsupportedExpressions(subPlan) } - checkStreamingQueryGlobalWatermarkLimit(plan, outputMode, failWhenDetected = false) + checkStreamingQueryGlobalWatermarkLimit(plan, outputMode) } def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ef988052af..546b199950 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1382,6 +1382,21 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED = + buildConf("spark.sql.streaming.statefulOperator.checkCorrectness.enabled") + .internal() + .doc("When true, the stateful operators for streaming query will be checked for possible " + + "correctness issue due to global watermark. The correctness issue comes from queries " + + "containing stateful operation which can emit rows older than the current watermark " + + "plus allowed late record delay, which are \"late rows\" in downstream stateful " + + "operations and these rows can be discarded. Please refer the programming guide doc for " + + "more details. Once the issue is detected, Spark will throw analysis exception. " + + "When this config is disabled, Spark will just print warning message for users. " + + "Prior to Spark 3.1.0, the behavior is disabling this config.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like `${var}`, `${system:var}`, " + @@ -3017,6 +3032,9 @@ class SQLConf extends Serializable with Logging { def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) + def statefulOperatorCorrectnessCheckEnabled: Boolean = + getConf(STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED) + def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index b9943a9744..21dde3ca8c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder} import org.apache.spark.unsafe.types.CalendarInterval @@ -36,7 +37,7 @@ import org.apache.spark.unsafe.types.CalendarInterval /** A dummy command for testing unsupported operations. */ case class DummyCommand() extends Command -class UnsupportedOperationsSuite extends SparkFunSuite { +class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { val attribute = AttributeReference("a", IntegerType, nullable = true)() val watermarkMetadata = new MetadataBuilder() @@ -218,6 +219,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update")) // FlatMapGroupsWithState(Append) in streaming with aggregation + // Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled. for (outputMode <- Seq(Append, Update, Complete)) { assertSupportedInStreamingPlan( "flatMapGroupsWithState - flatMapGroupsWithState(Append) " + @@ -228,7 +230,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { FlatMapGroupsWithState( null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, streamRelation)), - outputMode = outputMode) + outputMode = outputMode, + SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") } for (outputMode <- Seq(Append, Update)) { @@ -268,6 +271,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { } // multiple FlatMapGroupsWithStates + // Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled. assertSupportedInStreamingPlan( "flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " + "in append mode", @@ -275,7 +279,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { isMapGroupsWithState = false, null, FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, streamRelation)), - outputMode = Append) + outputMode = Append, + SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") assertNotSupportedInStreamingPlan( "flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" + @@ -995,9 +1000,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite { def assertSupportedInStreamingPlan( name: String, plan: LogicalPlan, - outputMode: OutputMode): Unit = { + outputMode: OutputMode, + configs: (String, String)*): Unit = { test(s"streaming plan - $name: supported") { - UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode) + withSQLConf(configs: _*) { + UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode) + } } } @@ -1070,14 +1078,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite { expectFailure: Boolean): Unit = { test(s"Global watermark limit - $testNamePostfix") { if (expectFailure) { - val e = intercept[AnalysisException] { - UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( - wrapInStreaming(plan), outputMode, failWhenDetected = true) + withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "true") { + val e = intercept[AnalysisException] { + UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( + wrapInStreaming(plan), outputMode) + } + assert(e.message.contains("Detected pattern of possible 'correctness' issue")) } - assert(e.message.contains("Detected pattern of possible 'correctness' issue")) } else { - UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( - wrapInStreaming(plan), outputMode, failWhenDetected = true) + withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") { + UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( + wrapInStreaming(plan), outputMode) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index 2efd715b77..f97c9386f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -1324,7 +1324,9 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { def testWithAllStateVersions(name: String)(func: => Unit): Unit = { for (version <- FlatMapGroupsWithStateExecHelper.supportedVersions) { test(s"$name - state format version $version") { - withSQLConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString) { + withSQLConf( + SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString, + SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") { func } }