[SPARK-19268][SS] Disallow adaptive query execution for streaming queries
## What changes were proposed in this pull request? As adaptive query execution may change the number of partitions in different batches, it may break streaming queries. Hence, we should disallow this feature in Structured Streaming. ## How was this patch tested? `test("SPARK-19268: Adaptive query execution should be disallowed")`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16683 from zsxwing/SPARK-19268.
This commit is contained in:
parent
e576c1ed79
commit
60bd91a340
|
@ -230,6 +230,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
|
|||
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
|
||||
}
|
||||
|
||||
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
|
||||
throw new AnalysisException(
|
||||
s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
|
||||
"is not supported in streaming DataFrames/Datasets")
|
||||
}
|
||||
|
||||
new StreamingQueryWrapper(new StreamExecution(
|
||||
sparkSession,
|
||||
userSpecifiedName.orNull,
|
||||
|
|
|
@ -30,8 +30,9 @@ import org.scalatest.time.Span
|
|||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
import org.apache.spark.SparkException
|
||||
import org.apache.spark.sql.Dataset
|
||||
import org.apache.spark.sql.{AnalysisException, Dataset}
|
||||
import org.apache.spark.sql.execution.streaming._
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.streaming.util.BlockingSource
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
@ -238,6 +239,15 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-19268: Adaptive query execution should be disallowed") {
|
||||
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
|
||||
val e = intercept[AnalysisException] {
|
||||
MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start()
|
||||
}
|
||||
assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) &&
|
||||
e.getMessage.contains("not supported"))
|
||||
}
|
||||
}
|
||||
|
||||
/** Run a body of code by defining a query on each dataset */
|
||||
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
|
||||
|
|
Loading…
Reference in a new issue