diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala index bfb6e805c0..1520b486b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala @@ -97,6 +97,7 @@ object RemoveRedundantProjects extends Rule[SparkPlan] { // If a DataSourceV2ScanExec node does not support columnar, a ProjectExec node is required // to convert the rows to UnsafeRow. See DataSourceV2Strategy for more details. case d: DataSourceV2ScanExecBase if !d.supportsColumnar => false + case FilterExec(_, d: DataSourceV2ScanExecBase) if !d.supportsColumnar => false case _ => if (requireOrdering) { project.output.map(_.exprId.id) == child.output.map(_.exprId.id) && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala index 944aa963cc..d71f74e71e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.connector.SimpleWritableDataSource import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -28,6 +29,7 @@ abstract class RemoveRedundantProjectsSuiteBase extends QueryTest with SharedSparkSession with AdaptiveSparkPlanHelper { + import testImplicits._ private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = { withClue(df.queryExecution) { @@ -215,6 +217,24 @@ abstract class RemoveRedundantProjectsSuiteBase |LIMIT 10 |""".stripMargin assertProjectExec(query, 0, 3) + + } + + Seq("true", "false").foreach { codegenEnabled => + test("SPARK-35287: project generating unsafe row for DataSourceV2ScanRelation " + + s"should not be removed (codegen=$codegenEnabled)") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled) { + withTempPath { path => + val format = classOf[SimpleWritableDataSource].getName + spark.range(3).select($"id" as "i", $"id" as "j") + .write.format(format).mode("overwrite").save(path.getCanonicalPath) + + val df = + spark.read.format(format).load(path.getCanonicalPath).filter($"i" > 0).orderBy($"i") + assert(df.collect === Array(Row(1, 1), Row(2, 2))) + } + } + } } }