127bc899ae
## What changes were proposed in this pull request? Performance issue using explode was found when a complex field contains huge array is to get duplicated as the number of exploded array elements. Given example: ```scala val df = spark.sparkContext.parallelize(Seq(("1", Array.fill(M)({ val i = math.random (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) })))).toDF("col", "arr") .selectExpr("col", "struct(col, arr) as st") .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col") ``` The explode causes `st` to be duplicated as many as the exploded elements. Benchmarks it: ``` [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 [info] Intel(R) Core(TM) i7-8750H CPU 2.20GHz [info] generate big nested struct array: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] generate big nested struct array wholestage off 52668 53162 699 0.0 877803.4 1.0X [info] generate big nested struct array wholestage on 47261 49093 1125 0.0 787690.2 1.1X [info] ``` The query plan: ``` == Physical Plan == Project [col#508, st#512.col AS col1#515, arr_col#519] +- Generate explode(st#512.arr), [col#508, st#512], false, [arr_col#519] +- Project [_1#503 AS col#508, named_struct(col, _1#503, arr, _2#504) AS st#512] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#503, mapobjects(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._3, true, false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#504] +- Scan[obj#534] ``` This patch takes nested column pruning approach to prune unnecessary nested fields. It adds a projection of the needed nested fields as aliases on the child of `Generate`, and substitutes them by alias attributes on the projection on top of `Generate`. Benchmarks it after the change: ``` [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 [info] Intel(R) Core(TM) i7-8750H CPU 2.20GHz [info] generate big nested struct array: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] generate big nested struct array wholestage off 311 331 28 0.2 5188.6 1.0X [info] generate big nested struct array wholestage on 297 312 15 0.2 4947.3 1.0X [info] ``` The query plan: ``` == Physical Plan == Project [col#592, _gen_alias_608#608 AS col1#599, arr_col#603] +- Generate explode(st#596.arr), [col#592, _gen_alias_608#608], false, [arr_col#603] +- Project [_1#587 AS col#592, named_struct(col, _1#587, arr, _2#588) AS st#596, _1#587 AS _gen_alias_608#608] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(in put[0, scala.Tuple2, true]))._1, true, false) AS _1#587, mapobjects(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._3, true, false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#588] +- Scan[obj#586] ``` This behavior is controlled by a SQL config `spark.sql.optimizer.expression.nestedPruning.enabled`. ## How was this patch tested? Added benchmark. Closes #24637 from viirya/SPARK-27707. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> |
||
---|---|---|
.. | ||
benchmarks | ||
src | ||
v1.2.1/src | ||
v2.3.5/src | ||
pom.xml |