From 253a1aee46abf365a92160996a695bc47e8b6db3 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 28 Apr 2021 10:47:11 +0900 Subject: [PATCH] [SPARK-35246][SS] Don't allow streaming-batch intersects ### What changes were proposed in this pull request? The UnsupportedOperationChecker shouldn't allow streaming-batch intersects. As described in the ticket, they can't actually be planned correctly, and even simple cases like the below will fail: ``` test("intersect") { val input = MemoryStream[Long] val df = input.toDS().intersect(spark.range(10).as[Long]) testStream(df) ( AddData(input, 1L), CheckAnswer(1) ) } ``` ### Why are the changes needed? Users will be confused by the cryptic errors produced from trying to run an invalid query plan. ### Does this PR introduce _any_ user-facing change? Some queries which previously failed with a poor error will now fail with a better one. ### How was this patch tested? modified unit test Closes #32371 from jose-torres/ossthing. Authored-by: Jose Torres Signed-off-by: hyukjinkwon --- .../sql/catalyst/analysis/UnsupportedOperationChecker.scala | 4 ++-- .../sql/catalyst/analysis/UnsupportedOperationsSuite.scala | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 0d586e7261..a3a85cb120 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -357,8 +357,8 @@ object UnsupportedOperationChecker extends Logging { case Except(left, right, _) if right.isStreaming => throwError("Except on a streaming DataFrame/Dataset on the right is not supported") - case Intersect(left, right, _) if left.isStreaming && right.isStreaming => - throwError("Intersect between two streaming DataFrames/Datasets is not supported") + case Intersect(left, right, _) if left.isStreaming || right.isStreaming => + throwError("Intersect of streaming DataFrames/Datasets is not supported") case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) && outputMode == InternalOutputModes.Update => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index dc62841e05..296d0ee8f4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -585,10 +585,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { streamStreamSupported = false, batchStreamSupported = false) - // Intersect: stream-stream not supported + // Intersect: not supported testBinaryOperationInStreamingPlan( "intersect", _.intersect(_, isAll = false), + batchStreamSupported = false, + streamBatchSupported = false, streamStreamSupported = false) // Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode