[SPARK-34548][SQL] Remove unnecessary children from Union under Distince and Deduplicate

### What changes were proposed in this pull request?

This patch proposes to remove unnecessary children from Union under Distince and Deduplicate

### Why are the changes needed?

If there are any duplicate child of `Union` under `Distinct` and `Deduplicate`, it can be removed to simplify query plan.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #31656 from viirya/SPARK-34548.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Liang-Chi Hsieh 2021-03-02 17:09:08 -08:00
parent 6c5322de61
commit bab9531134
3 changed files with 52 additions and 12 deletions

View file

@ -157,8 +157,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
RemoveNoopOperators,
RemoveNoopUnion,
CombineUnions) ::
CombineUnions,
RemoveNoopUnion) ::
Batch("OptimizeLimitZero", Once,
OptimizeLimitZero) ::
// Run this once earlier. This might simplify the plan and reduce cost of optimizer.
@ -509,7 +509,8 @@ object RemoveNoopOperators extends Rule[LogicalPlan] {
}
/**
* Remove no-op `Union` from the query plan that do not make any modifications.
* Smplify the children of `Union` or remove no-op `Union` from the query plan that
* do not make any modifications to the query.
*/
object RemoveNoopUnion extends Rule[LogicalPlan] {
/**
@ -532,21 +533,29 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
case _ => plan
}
private def removeUnion(u: Union): Option[LogicalPlan] = {
val unionChildren = u.children.map(removeAliasOnlyProject)
if (unionChildren.tail.forall(unionChildren.head.sameResult(_))) {
Some(u.children.head)
private def simplifyUnion(u: Union): LogicalPlan = {
val uniqueChildren = mutable.ArrayBuffer.empty[LogicalPlan]
val uniqueChildrenKey = mutable.HashSet.empty[LogicalPlan]
u.children.foreach { c =>
val key = removeAliasOnlyProject(c).canonicalized
if (!uniqueChildrenKey.contains(key)) {
uniqueChildren += c
uniqueChildrenKey += key
}
}
if (uniqueChildren.size == 1) {
u.children.head
} else {
None
u.copy(children = uniqueChildren)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case d @ Distinct(u: Union) =>
removeUnion(u).map(c => d.withNewChildren(Seq(c))).getOrElse(d)
d.withNewChildren(Seq(simplifyUnion(u)))
case d @ Deduplicate(_, u: Union) =>
removeUnion(u).map(c => d.withNewChildren(Seq(c))).getOrElse(d)
d.withNewChildren(Seq(simplifyUnion(u)))
}
}

View file

@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
@ -33,6 +34,7 @@ class RemoveNoopUnionSuite extends PlanTest {
}
val testRelation = LocalRelation('a.int, 'b.int)
val testRelation2 = LocalRelation(output = Seq('a.int, 'b.int), data = Seq(InternalRow(1, 2)))
test("SPARK-34474: Remove redundant Union under Distinct") {
val union = Union(testRelation :: testRelation :: Nil)
@ -67,4 +69,16 @@ class RemoveNoopUnionSuite extends PlanTest {
val optimized = Optimize.execute(distinct)
comparePlans(optimized, distinct)
}
test("SPARK-34548: Remove unnecessary children from Union") {
val union = Union(testRelation :: testRelation :: testRelation2 :: Nil)
val distinct = Distinct(union)
val optimized1 = Optimize.execute(distinct)
comparePlans(optimized1, Distinct(Union(testRelation :: testRelation2 :: Nil)))
val deduplicate = Deduplicate(testRelation.output, union)
val optimized2 = Optimize.execute(deduplicate)
comparePlans(optimized2,
Deduplicate(testRelation.output, Union(testRelation :: testRelation2 :: Nil)))
}
}

View file

@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion
import org.apache.spark.sql.catalyst.plans.logical.Union
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession}
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession, SQLTestData}
import org.apache.spark.sql.test.SQLTestData.NullStrings
import org.apache.spark.sql.types._
@ -860,6 +860,23 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession {
}
}
}
test("SPARK-34548: Remove unnecessary children from Union") {
Seq(RemoveNoopUnion.ruleName, "").map { ruleName =>
withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> ruleName) {
val testDataCopy = spark.sparkContext.parallelize(
(1 to 100).map(i => SQLTestData.TestData(i, i.toString))).toDF()
val distinctUnionDF1 = testData.union(testData).union(testDataCopy).distinct()
val expected = testData.union(testDataCopy).distinct()
checkAnswer(distinctUnionDF1, expected)
val distinctUnionDF2 = testData.union(testData).union(testDataCopy)
.dropDuplicates(Seq("key"))
checkAnswer(distinctUnionDF2, expected)
}
}
}
}
case class UnionClass1a(a: Int, b: Long, nested: UnionClass2)