From b9585cde31fe99aecca42146c71c552218cba591 Mon Sep 17 00:00:00 2001 From: mingjial Date: Sun, 23 Aug 2020 17:40:59 -0700 Subject: [PATCH] [SPARK-32609][TEST] Add Tests for Incorrect exchange reuse with DataSourceV2 ### What changes were proposed in this pull request? Copy to master branch the unit test added for branch-2.4(https://github.com/apache/spark/pull/29430). ### Why are the changes needed? The unit test will pass at master branch, indicating that issue reported in https://issues.apache.org/jira/browse/SPARK-32609 is already fixed at master branch. But adding this unit test for future possible failure catch. ### Does this PR introduce _any_ user-facing change? no. ### How was this patch tested? sbt test run Closes #29435 from mingjialiu/master. Authored-by: mingjial Signed-off-by: Dongjoon Hyun --- .../sql/connector/DataSourceV2Suite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 2d8761f872..a9c521eb46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -394,6 +394,25 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS checkAnswer(df, (0 until 3).map(i => Row(i))) } } + + test("SPARK-32609: DataSourceV2 with different pushedfilters should be different") { + def getScanExec(query: DataFrame): BatchScanExec = { + query.queryExecution.executedPlan.collect { + case d: BatchScanExec => d + }.head + } + + Seq(classOf[AdvancedDataSourceV2], classOf[JavaAdvancedDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + val q1 = df.select('i).filter('i > 6) + val q2 = df.select('i).filter('i > 5) + val scan1 = getScanExec(q1) + val scan2 = getScanExec(q2) + assert(!scan1.equals(scan2)) + } + } + } }