[SPARK-33078][SQL] Add config for json expression optimization
### What changes were proposed in this pull request? This proposes to add a config for json expression optimization. ### Why are the changes needed? For the new Json expression optimization rules, it is safer if we can disable it using SQL config. ### Does this PR introduce _any_ user-facing change? Yes, users can disable json expression optimization rule. ### How was this patch tested? Unit test Closes #30047 from viirya/SPARK-33078. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
parent
82eea13c76
commit
9e3746469c
|
@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
|
||||||
import org.apache.spark.sql.catalyst.expressions._
|
import org.apache.spark.sql.catalyst.expressions._
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||||
import org.apache.spark.sql.catalyst.rules.Rule
|
import org.apache.spark.sql.catalyst.rules.Rule
|
||||||
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.types.{ArrayType, StructType}
|
import org.apache.spark.sql.types.{ArrayType, StructType}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,7 +36,7 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
|
||||||
*/
|
*/
|
||||||
object OptimizeJsonExprs extends Rule[LogicalPlan] {
|
object OptimizeJsonExprs extends Rule[LogicalPlan] {
|
||||||
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
|
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
|
||||||
case p => p.transformExpressions {
|
case p if SQLConf.get.jsonExpressionOptimization => p.transformExpressions {
|
||||||
|
|
||||||
case c: CreateNamedStruct
|
case c: CreateNamedStruct
|
||||||
// If we create struct from various fields of the same `JsonToStructs`.
|
// If we create struct from various fields of the same `JsonToStructs`.
|
||||||
|
|
|
@ -1458,6 +1458,15 @@ object SQLConf {
|
||||||
.booleanConf
|
.booleanConf
|
||||||
.createWithDefault(true)
|
.createWithDefault(true)
|
||||||
|
|
||||||
|
val JSON_EXPRESSION_OPTIMIZATION =
|
||||||
|
buildConf("spark.sql.optimizer.enableJsonExpressionOptimization")
|
||||||
|
.doc("Whether to optimize JSON expressions in SQL optimizer. It includes pruning " +
|
||||||
|
"unnecessary columns from from_json, simplifing from_json + to_json, to_json + " +
|
||||||
|
"named_struct(from_json.col1, from_json.col2, ....).")
|
||||||
|
.version("3.1.0")
|
||||||
|
.booleanConf
|
||||||
|
.createWithDefault(true)
|
||||||
|
|
||||||
val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion")
|
val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion")
|
||||||
.internal()
|
.internal()
|
||||||
.doc("Whether to delete the expired log files in file stream sink.")
|
.doc("Whether to delete the expired log files in file stream sink.")
|
||||||
|
@ -3232,6 +3241,8 @@ class SQLConf extends Serializable with Logging {
|
||||||
|
|
||||||
def jsonGeneratorIgnoreNullFields: Boolean = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS)
|
def jsonGeneratorIgnoreNullFields: Boolean = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS)
|
||||||
|
|
||||||
|
def jsonExpressionOptimization: Boolean = getConf(SQLConf.JSON_EXPRESSION_OPTIMIZATION)
|
||||||
|
|
||||||
def parallelFileListingInStatsComputation: Boolean =
|
def parallelFileListingInStatsComputation: Boolean =
|
||||||
getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION)
|
getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION)
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,15 @@ import org.apache.spark.sql.types._
|
||||||
|
|
||||||
class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper {
|
class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper {
|
||||||
|
|
||||||
|
private var jsonExpressionOptimizeEnabled: Boolean = _
|
||||||
|
protected override def beforeAll(): Unit = {
|
||||||
|
jsonExpressionOptimizeEnabled = SQLConf.get.jsonExpressionOptimization
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override def afterAll(): Unit = {
|
||||||
|
SQLConf.get.setConf(SQLConf.JSON_EXPRESSION_OPTIMIZATION, jsonExpressionOptimizeEnabled)
|
||||||
|
}
|
||||||
|
|
||||||
object Optimizer extends RuleExecutor[LogicalPlan] {
|
object Optimizer extends RuleExecutor[LogicalPlan] {
|
||||||
val batches = Batch("Json optimization", FixedPoint(10), OptimizeJsonExprs) :: Nil
|
val batches = Batch("Json optimization", FixedPoint(10), OptimizeJsonExprs) :: Nil
|
||||||
}
|
}
|
||||||
|
@ -266,4 +275,16 @@ class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper {
|
||||||
checkEvaluation(e1, e2.eval(row), row)
|
checkEvaluation(e1, e2.eval(row), row)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-33078: disable json optimization") {
|
||||||
|
withSQLConf(SQLConf.JSON_EXPRESSION_OPTIMIZATION.key -> "false") {
|
||||||
|
val options = Map.empty[String, String]
|
||||||
|
|
||||||
|
val query = testRelation
|
||||||
|
.select(JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct"))
|
||||||
|
val optimized = Optimizer.execute(query.analyze)
|
||||||
|
|
||||||
|
comparePlans(optimized, query.analyze)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue