[SPARK-32948][SQL] Optimize to_json and from_json expression chain

### What changes were proposed in this pull request?

This patch proposes to optimize from_json + to_json expression chain.

### Why are the changes needed?

To optimize json expression chain that could be manually generated or generated automatically during query optimization.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Closes #29828 from viirya/SPARK-32948.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Liang-Chi Hsieh 2020-09-28 22:22:47 -07:00 committed by Dongjoon Hyun
parent 1b60ff5afe
commit 202115e7cd
3 changed files with 188 additions and 0 deletions

View file

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
/**
* Simplify redundant json related expressions.
*/
object OptimizeJsonExprs extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case p => p.transformExpressions {
case jsonToStructs @ JsonToStructs(_, options1,
StructsToJson(options2, child, timeZoneId2), timeZoneId1)
if options1.isEmpty && options2.isEmpty && timeZoneId1 == timeZoneId2 &&
jsonToStructs.dataType == child.dataType =>
// `StructsToJson` only fails when `JacksonGenerator` encounters data types it
// cannot convert to JSON. But `StructsToJson.checkInputDataTypes` already
// verifies its child's data types is convertible to JSON. But in
// `StructsToJson(JsonToStructs(...))` case, we cannot verify input json string
// so `JsonToStructs` might throw error in runtime. Thus we cannot optimize
// this case similarly.
child
}
}
}

View file

@ -111,6 +111,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveNoopOperators,
CombineWithFields,
SimplifyExtractValueOps,
OptimizeJsonExprs,
CombineConcats) ++
extendedOperatorOptimizationRules

View file

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util.DateTimeUtils.getZoneId
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper {
object Optimizer extends RuleExecutor[LogicalPlan] {
val batches = Batch("Json optimization", FixedPoint(10), OptimizeJsonExprs) :: Nil
}
val schema = StructType.fromDDL("a int, b int")
private val structAtt = 'struct.struct(schema).notNull
private val testRelation = LocalRelation(structAtt)
test("SPARK-32948: optimize from_json + to_json") {
val options = Map.empty[String, String]
val query1 = testRelation
.select(JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct"))
val optimized1 = Optimizer.execute(query1.analyze)
val expected = testRelation.select('struct.as("struct")).analyze
comparePlans(optimized1, expected)
val query2 = testRelation
.select(
JsonToStructs(schema, options,
StructsToJson(options,
JsonToStructs(schema, options,
StructsToJson(options, 'struct)))).as("struct"))
val optimized2 = Optimizer.execute(query2.analyze)
comparePlans(optimized2, expected)
}
test("SPARK-32948: not optimize from_json + to_json if schema is different") {
val options = Map.empty[String, String]
val schema = StructType.fromDDL("a int")
val query = testRelation
.select(JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct"))
val optimized = Optimizer.execute(query.analyze)
val expected = testRelation.select(
JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct")).analyze
comparePlans(optimized, expected)
}
test("SPARK-32948: if user gives schema with different letter case under case-insensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val options = Map.empty[String, String]
val schema = StructType.fromDDL("a int, B int")
val query = testRelation
.select(JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct"))
val optimized = Optimizer.execute(query.analyze)
val expected = testRelation.select(
JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct")).analyze
comparePlans(optimized, expected)
}
}
test("SPARK-32948: not optimize from_json + to_json if nullability is different") {
val options = Map.empty[String, String]
val nonNullSchema = StructType(
StructField("a", IntegerType, false) :: StructField("b", IntegerType, false) :: Nil)
val structAtt = 'struct.struct(nonNullSchema).notNull
val testRelationWithNonNullAttr = LocalRelation(structAtt)
val schema = StructType.fromDDL("a int, b int")
val query = testRelationWithNonNullAttr
.select(JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct"))
val optimized = Optimizer.execute(query.analyze)
val expected = testRelationWithNonNullAttr.select(
JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct")).analyze
comparePlans(optimized, expected)
}
test("SPARK-32948: not optimize from_json + to_json if option is not empty") {
val options = Map("testOption" -> "test")
val query = testRelation
.select(JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct"))
val optimized = Optimizer.execute(query.analyze)
val expected = testRelation.select(
JsonToStructs(schema, options, StructsToJson(options, 'struct)).as("struct")).analyze
comparePlans(optimized, expected)
}
test("SPARK-32948: not optimize from_json + to_json if timezone is different") {
val options = Map.empty[String, String]
val UTC_OPT = Option("UTC")
val PST = getZoneId("-08:00")
val query1 = testRelation
.select(JsonToStructs(schema, options,
StructsToJson(options, 'struct, Option(PST.getId)), UTC_OPT).as("struct"))
val optimized1 = Optimizer.execute(query1.analyze)
val expected1 = testRelation.select(
JsonToStructs(schema, options,
StructsToJson(options, 'struct, Option(PST.getId)), UTC_OPT).as("struct")).analyze
comparePlans(optimized1, expected1)
val query2 = testRelation
.select(JsonToStructs(schema, options,
StructsToJson(options, 'struct, UTC_OPT), UTC_OPT).as("struct"))
val optimized2 = Optimizer.execute(query2.analyze)
val expected2 = testRelation.select('struct.as("struct")).analyze
comparePlans(optimized2, expected2)
}
}