[SPARK-33259][SS] Disable streaming query with possible correctness issue by default

### What changes were proposed in this pull request?

This patch proposes to disable the streaming query with possible correctness issue in chained stateful operators. The behavior can be controlled by a SQL config, so if users understand the risk and still want to run the query, they can disable the check.

### Why are the changes needed?

The possible correctness in chained stateful operators in streaming query is not straightforward for users. From users perspective, it will be considered as a Spark bug. It is also possible the worse case, users are not aware of the correctness issue and use wrong results.

A better approach should be to disable such queries and let users choose to run the query if they understand there is such risk, instead of implicitly running the query and let users to find out correctness issue by themselves and report this known to Spark community.

### Does this PR introduce _any_ user-facing change?

Yes. Streaming query with possible correctness issue will be blocked to run, except for users explicitly disable the SQL config.

### How was this patch tested?

Unit test.

Closes #30210 from viirya/SPARK-33259.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Liang-Chi Hsieh 2020-11-12 15:31:57 -08:00 committed by Dongjoon Hyun
parent cf3b6551ce
commit 2c64b731ae
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
5 changed files with 64 additions and 17 deletions

View file

@ -26,10 +26,14 @@ Note that this migration guide describes the items specific to Structured Stream
Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).
## Upgrading from Structured Streaming 3.0 to 3.1
- In Spark 3.0 and before, for the queries that have stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded, Spark only prints a warning message. Since Spark 3.1, Spark will check for such queries with possible correctness issue and throw AnalysisException for it by default. For the users who understand the possible risk of correctness issue and still decide to run the query, please disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.
## Upgrading from Structured Streaming 2.4 to 3.0
- In Spark 3.0, Structured Streaming forces the source schema into nullable when file-based datasources such as text, json, csv, parquet and orc are used via `spark.readStream(...)`. Previously, it respected the nullability in source schema; however, it caused issues tricky to debug with NPE. To restore the previous behavior, set `spark.sql.streaming.fileSource.schema.forceNullable` to `false`.
- Spark 3.0 fixes the correctness issue on Stream-stream outer join, which changes the schema of state. (See [SPARK-26154](https://issues.apache.org/jira/browse/SPARK-26154) for more details). If you start your query from checkpoint constructed from Spark 2.x which uses stream-stream outer join, Spark 3.0 fails the query. To recalculate outputs, discard the checkpoint and replay previous inputs.
- In Spark 3.0, the deprecated class `org.apache.spark.sql.streaming.ProcessingTime` has been removed. Use `org.apache.spark.sql.streaming.Trigger.ProcessingTime` instead. Likewise, `org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger` has been removed in favor of `Trigger.Continuous`, and `org.apache.spark.sql.execution.streaming.OneTimeTrigger` has been hidden in favor of `Trigger.Once`.
- In Spark 3.0, the deprecated class `org.apache.spark.sql.streaming.ProcessingTime` has been removed. Use `org.apache.spark.sql.streaming.Trigger.ProcessingTime` instead. Likewise, `org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger` has been removed in favor of `Trigger.Continuous`, and `org.apache.spark.sql.execution.streaming.OneTimeTrigger` has been hidden in favor of `Trigger.Once`.

View file

@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
/**
@ -40,10 +41,15 @@ object UnsupportedOperationChecker extends Logging {
}
}
/**
* Checks for possible correctness issue in chained stateful operators. The behavior is
* controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
* print a warning message.
*/
def checkStreamingQueryGlobalWatermarkLimit(
plan: LogicalPlan,
outputMode: OutputMode,
failWhenDetected: Boolean): Unit = {
outputMode: OutputMode): Unit = {
def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
case s: Aggregate
if s.isStreaming && outputMode == InternalOutputModes.Append => true
@ -62,6 +68,8 @@ object UnsupportedOperationChecker extends Logging {
case _ => false
}
val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
try {
plan.foreach { subPlan =>
if (isStatefulOperation(subPlan)) {
@ -73,7 +81,10 @@ object UnsupportedOperationChecker extends Logging {
"The query contains stateful operation which can emit rows older than " +
"the current watermark plus allowed late record delay, which are \"late rows\"" +
" in downstream stateful operations and these rows can be discarded. " +
"Please refer the programming guide doc for more details."
"Please refer the programming guide doc for more details. If you understand " +
"the possible risk of correctness issue and still need to run the query, " +
"you can disable this check by setting the config " +
"`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false."
throwError(errorMsg)(plan)
}
}
@ -388,7 +399,7 @@ object UnsupportedOperationChecker extends Logging {
checkUnsupportedExpressions(subPlan)
}
checkStreamingQueryGlobalWatermarkLimit(plan, outputMode, failWhenDetected = false)
checkStreamingQueryGlobalWatermarkLimit(plan, outputMode)
}
def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = {

View file

@ -1382,6 +1382,21 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
val STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED =
buildConf("spark.sql.streaming.statefulOperator.checkCorrectness.enabled")
.internal()
.doc("When true, the stateful operators for streaming query will be checked for possible " +
"correctness issue due to global watermark. The correctness issue comes from queries " +
"containing stateful operation which can emit rows older than the current watermark " +
"plus allowed late record delay, which are \"late rows\" in downstream stateful " +
"operations and these rows can be discarded. Please refer the programming guide doc for " +
"more details. Once the issue is detected, Spark will throw analysis exception. " +
"When this config is disabled, Spark will just print warning message for users. " +
"Prior to Spark 3.1.0, the behavior is disabling this config.")
.version("3.1.0")
.booleanConf
.createWithDefault(true)
val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
@ -3017,6 +3032,9 @@ class SQLConf extends Serializable with Logging {
def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
def statefulOperatorCorrectnessCheckEnabled: Boolean =
getConf(STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED)
def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)

View file

@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder}
import org.apache.spark.unsafe.types.CalendarInterval
@ -36,7 +37,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
/** A dummy command for testing unsupported operations. */
case class DummyCommand() extends Command
class UnsupportedOperationsSuite extends SparkFunSuite {
class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
val attribute = AttributeReference("a", IntegerType, nullable = true)()
val watermarkMetadata = new MetadataBuilder()
@ -218,6 +219,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update"))
// FlatMapGroupsWithState(Append) in streaming with aggregation
// Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled.
for (outputMode <- Seq(Append, Update, Complete)) {
assertSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
@ -228,7 +230,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
streamRelation)),
outputMode = outputMode)
outputMode = outputMode,
SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false")
}
for (outputMode <- Seq(Append, Update)) {
@ -268,6 +271,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
}
// multiple FlatMapGroupsWithStates
// Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled.
assertSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " +
"in append mode",
@ -275,7 +279,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
isMapGroupsWithState = false, null,
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null, streamRelation)),
outputMode = Append)
outputMode = Append,
SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false")
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" +
@ -995,9 +1000,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
def assertSupportedInStreamingPlan(
name: String,
plan: LogicalPlan,
outputMode: OutputMode): Unit = {
outputMode: OutputMode,
configs: (String, String)*): Unit = {
test(s"streaming plan - $name: supported") {
UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
withSQLConf(configs: _*) {
UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
}
}
}
@ -1070,14 +1078,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
expectFailure: Boolean): Unit = {
test(s"Global watermark limit - $testNamePostfix") {
if (expectFailure) {
val e = intercept[AnalysisException] {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode, failWhenDetected = true)
withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "true") {
val e = intercept[AnalysisException] {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode)
}
assert(e.message.contains("Detected pattern of possible 'correctness' issue"))
}
assert(e.message.contains("Detected pattern of possible 'correctness' issue"))
} else {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode, failWhenDetected = true)
withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode)
}
}
}
}

View file

@ -1324,7 +1324,9 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
def testWithAllStateVersions(name: String)(func: => Unit): Unit = {
for (version <- FlatMapGroupsWithStateExecHelper.supportedVersions) {
test(s"$name - state format version $version") {
withSQLConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString) {
withSQLConf(
SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString,
SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") {
func
}
}