From c21303f02c582e97fefc130415e739ddda8dd43e Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 26 Aug 2021 14:55:21 +0800 Subject: [PATCH] [SPARK-36594][SQL][3.2] ORC vectorized reader should properly check maximal number of fields ### What changes were proposed in this pull request? This is the patch on branch-3.2 for https://github.com/apache/spark/pull/33842. See the description in the other PR. ### Why are the changes needed? Avoid OOM/performance regression when reading ORC table with nested column types. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `OrcSourceSuite.scala`. Closes #33843 from c21/branch-3.2. Authored-by: Cheng Su Signed-off-by: Wenchen Fan --- .../datasources/orc/OrcFileFormat.scala | 3 ++- .../datasources/orc/OrcSourceSuite.scala | 26 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 9251d33011..5b08f51850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -132,7 +133,7 @@ class OrcFileFormat override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled && - schema.length <= conf.wholeStageMaxNumFields && + !WholeStageCodegenExec.isTooManyFields(conf, schema) && schema.forall(s => supportDataType(s.dataType) && !s.dataType.isInstanceOf[UserDefinedType[_]]) && supportBatchForNestedColumn(sparkSession, schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 9acf59cbd9..348ef6f9ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -659,4 +659,30 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession { checkAnswer(spark.sql("SELECT _col0, _col2.c1 FROM t1"), Seq(Row(1, "a"))) } } + + test("SPARK-36594: ORC vectorized reader should properly check maximal number of fields") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(10).map { x => + val stringColumn = s"$x" * 10 + val structColumn = (x, s"$x" * 100) + val arrayColumn = (0 until 5).map(i => (x + i, s"$x" * 5)) + val mapColumn = Map(s"$x" -> (x * 0.1, (x, s"$x" * 100))) + (x, stringColumn, structColumn, arrayColumn, mapColumn) + }.toDF("int_col", "string_col", "struct_col", "array_col", "map_col") + df.write.format("orc").save(path) + + Seq(("5", false), ("10", true)).foreach { + case (maxNumFields, vectorizedEnabled) => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true", + SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> maxNumFields) { + val scanPlan = spark.read.orc(path).queryExecution.executedPlan + assert(scanPlan.find { + case scan: FileSourceScanExec => scan.supportsColumnar + case _ => false + }.isDefined == vectorizedEnabled) + } + } + } + } }