From 9e3746469c23fd88f6dacc5082a157ca6970414e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 15 Oct 2020 12:38:10 -0700 Subject: [PATCH] [SPARK-33078][SQL] Add config for json expression optimization ### What changes were proposed in this pull request? This proposes to add a config for json expression optimization. ### Why are the changes needed? For the new Json expression optimization rules, it is safer if we can disable it using SQL config. ### Does this PR introduce _any_ user-facing change? Yes, users can disable json expression optimization rule. ### How was this patch tested? Unit test Closes #30047 from viirya/SPARK-33078. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun --- .../optimizer/OptimizeJsonExprs.scala | 3 ++- .../apache/spark/sql/internal/SQLConf.scala | 11 ++++++++++ .../optimizer/OptimizeJsonExprsSuite.scala | 21 +++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala index fcd5412d66..ce86d8cdd4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, StructType} /** @@ -35,7 +36,7 @@ import org.apache.spark.sql.types.{ArrayType, StructType} */ object OptimizeJsonExprs extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case p => p.transformExpressions { + case p if SQLConf.get.jsonExpressionOptimization => p.transformExpressions { case c: CreateNamedStruct // If we create struct from various fields of the same `JsonToStructs`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d4c7dd7f31..79d78088f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1458,6 +1458,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val JSON_EXPRESSION_OPTIMIZATION = + buildConf("spark.sql.optimizer.enableJsonExpressionOptimization") + .doc("Whether to optimize JSON expressions in SQL optimizer. It includes pruning " + + "unnecessary columns from from_json, simplifing from_json + to_json, to_json + " + + "named_struct(from_json.col1, from_json.col2, ....).") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream sink.") @@ -3232,6 +3241,8 @@ class SQLConf extends Serializable with Logging { def jsonGeneratorIgnoreNullFields: Boolean = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS) + def jsonExpressionOptimization: Boolean = getConf(SQLConf.JSON_EXPRESSION_OPTIMIZATION) + def parallelFileListingInStatsComputation: Boolean = getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala index 7d975a1b00..4129a37eb6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala @@ -29,6 +29,15 @@ import org.apache.spark.sql.types._ class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper { + private var jsonExpressionOptimizeEnabled: Boolean = _ + protected override def beforeAll(): Unit = { + jsonExpressionOptimizeEnabled = SQLConf.get.jsonExpressionOptimization + } + + protected override def afterAll(): Unit = { + SQLConf.get.setConf(SQLConf.JSON_EXPRESSION_OPTIMIZATION, jsonExpressionOptimizeEnabled) + } + object Optimizer extends RuleExecutor[LogicalPlan] { val batches = Batch("Json optimization", FixedPoint(10), OptimizeJsonExprs) :: Nil } @@ -266,4 +275,16 @@ class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper { checkEvaluation(e1, e2.eval(row), row) }) } + + test("SPARK-33078: disable json optimization") { + withSQLConf(SQLConf.JSON_EXPRESSION_OPTIMIZATION.key -> "false") { + val options = Map.empty[String, String] + + val query = testRelation + .select(JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct")) + val optimized = Optimizer.execute(query.analyze) + + comparePlans(optimized, query.analyze) + } + } }