[SPARK-35246][SS] Don't allow streaming-batch intersects

### What changes were proposed in this pull request?
The UnsupportedOperationChecker shouldn't allow streaming-batch intersects. As described in the ticket, they can't actually be planned correctly, and even simple cases like the below will fail:

```
  test("intersect") {
    val input = MemoryStream[Long]
    val df = input.toDS().intersect(spark.range(10).as[Long])
    testStream(df) (
      AddData(input, 1L),
      CheckAnswer(1)
    )
  }
```

### Why are the changes needed?
Users will be confused by the cryptic errors produced from trying to run an invalid query plan.

### Does this PR introduce _any_ user-facing change?
Some queries which previously failed with a poor error will now fail with a better one.

### How was this patch tested?
modified unit test

Closes #32371 from jose-torres/ossthing.

Authored-by: Jose Torres <joseph.torres@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
This commit is contained in:
Jose Torres 2021-04-28 10:47:11 +09:00 committed by hyukjinkwon
parent 10c2b68d24
commit 253a1aee46
2 changed files with 5 additions and 3 deletions

View file

@ -357,8 +357,8 @@ object UnsupportedOperationChecker extends Logging {
case Except(left, right, _) if right.isStreaming =>
throwError("Except on a streaming DataFrame/Dataset on the right is not supported")
case Intersect(left, right, _) if left.isStreaming && right.isStreaming =>
throwError("Intersect between two streaming DataFrames/Datasets is not supported")
case Intersect(left, right, _) if left.isStreaming || right.isStreaming =>
throwError("Intersect of streaming DataFrames/Datasets is not supported")
case GlobalLimit(_, _) | LocalLimit(_, _)
if subPlan.children.forall(_.isStreaming) && outputMode == InternalOutputModes.Update =>

View file

@ -585,10 +585,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
streamStreamSupported = false,
batchStreamSupported = false)
// Intersect: stream-stream not supported
// Intersect: not supported
testBinaryOperationInStreamingPlan(
"intersect",
_.intersect(_, isAll = false),
batchStreamSupported = false,
streamBatchSupported = false,
streamStreamSupported = false)
// Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode