[SPARK-32609][TEST] Add Tests for Incorrect exchange reuse with DataSourceV2

### What changes were proposed in this pull request?
Copy  to master branch the unit test added for branch-2.4(https://github.com/apache/spark/pull/29430).

### Why are the changes needed?
The unit test will pass at master branch, indicating that issue reported in https://issues.apache.org/jira/browse/SPARK-32609 is already fixed at master branch. But adding this unit test for future possible failure catch.

### Does this PR introduce _any_ user-facing change?
no.

### How was this patch tested?
sbt test run

Closes #29435 from mingjialiu/master.

Authored-by: mingjial <mingjial@google.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
mingjial 2020-08-23 17:40:59 -07:00 committed by Dongjoon Hyun
parent 8749f2e87a
commit b9585cde31

View file

@ -394,6 +394,25 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
checkAnswer(df, (0 until 3).map(i => Row(i))) checkAnswer(df, (0 until 3).map(i => Row(i)))
} }
} }
test("SPARK-32609: DataSourceV2 with different pushedfilters should be different") {
def getScanExec(query: DataFrame): BatchScanExec = {
query.queryExecution.executedPlan.collect {
case d: BatchScanExec => d
}.head
}
Seq(classOf[AdvancedDataSourceV2], classOf[JavaAdvancedDataSourceV2]).foreach { cls =>
withClue(cls.getName) {
val df = spark.read.format(cls.getName).load()
val q1 = df.select('i).filter('i > 6)
val q2 = df.select('i).filter('i > 5)
val scan1 = getScanExec(q1)
val scan2 = getScanExec(q2)
assert(!scan1.equals(scan2))
}
}
}
} }