[SPARK-36594][SQL][3.2] ORC vectorized reader should properly check maximal number of fields
### What changes were proposed in this pull request? This is the patch on branch-3.2 for https://github.com/apache/spark/pull/33842. See the description in the other PR. ### Why are the changes needed? Avoid OOM/performance regression when reading ORC table with nested column types. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `OrcSourceSuite.scala`. Closes #33843 from c21/branch-3.2. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
0c364e607d
commit
c21303f02c
|
@ -36,6 +36,7 @@ import org.apache.spark.sql.SparkSession
|
|||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
|
||||
import org.apache.spark.sql.execution.WholeStageCodegenExec
|
||||
import org.apache.spark.sql.execution.datasources._
|
||||
import org.apache.spark.sql.sources._
|
||||
import org.apache.spark.sql.types._
|
||||
|
@ -132,7 +133,7 @@ class OrcFileFormat
|
|||
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
|
||||
val conf = sparkSession.sessionState.conf
|
||||
conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
|
||||
schema.length <= conf.wholeStageMaxNumFields &&
|
||||
!WholeStageCodegenExec.isTooManyFields(conf, schema) &&
|
||||
schema.forall(s => supportDataType(s.dataType) &&
|
||||
!s.dataType.isInstanceOf[UserDefinedType[_]]) &&
|
||||
supportBatchForNestedColumn(sparkSession, schema)
|
||||
|
|
|
@ -659,4 +659,30 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession {
|
|||
checkAnswer(spark.sql("SELECT _col0, _col2.c1 FROM t1"), Seq(Row(1, "a")))
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-36594: ORC vectorized reader should properly check maximal number of fields") {
|
||||
withTempPath { dir =>
|
||||
val path = dir.getCanonicalPath
|
||||
val df = spark.range(10).map { x =>
|
||||
val stringColumn = s"$x" * 10
|
||||
val structColumn = (x, s"$x" * 100)
|
||||
val arrayColumn = (0 until 5).map(i => (x + i, s"$x" * 5))
|
||||
val mapColumn = Map(s"$x" -> (x * 0.1, (x, s"$x" * 100)))
|
||||
(x, stringColumn, structColumn, arrayColumn, mapColumn)
|
||||
}.toDF("int_col", "string_col", "struct_col", "array_col", "map_col")
|
||||
df.write.format("orc").save(path)
|
||||
|
||||
Seq(("5", false), ("10", true)).foreach {
|
||||
case (maxNumFields, vectorizedEnabled) =>
|
||||
withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true",
|
||||
SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> maxNumFields) {
|
||||
val scanPlan = spark.read.orc(path).queryExecution.executedPlan
|
||||
assert(scanPlan.find {
|
||||
case scan: FileSourceScanExec => scan.supportsColumnar
|
||||
case _ => false
|
||||
}.isDefined == vectorizedEnabled)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue