[SPARK-32678][SQL] Rename EmptyHashedRelationWithAllNullKeys and simplify NAAJ generated code

### What changes were proposed in this pull request?
Some Code refine.

1. rename EmptyHashedRelationWithAllNullKeys to HashedRelationWithAllNullKeys.
2. simplify generated code for BHJ NAAJ.

### Why are the changes needed?
Refine code and naming to avoid confusing understanding.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing test.

Closes #29503 from leanken/leanken-SPARK-32678.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
xuewei.linxuewei 2020-08-22 22:32:39 -07:00 committed by Liang-Chi Hsieh
parent a4d785dadc
commit f258718535
4 changed files with 12 additions and 24 deletions

View file

@ -20,17 +20,17 @@ package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys
/** /**
* This optimization rule detects and convert a NAAJ to an Empty LocalRelation * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
* when buildSide is EmptyHashedRelationWithAllNullKeys. * when buildSide is HashedRelationWithAllNullKeys.
*/ */
object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] { object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
private def canEliminate(plan: LogicalPlan): Boolean = plan match { private def canEliminate(plan: LogicalPlan): Boolean = plan match {
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined
&& stage.broadcast.relationFuture.get().value == EmptyHashedRelationWithAllNullKeys => true && stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys => true
case _ => false case _ => false
} }

View file

@ -146,7 +146,7 @@ case class BroadcastHashJoinExec(
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize) TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
if (hashed == EmptyHashedRelation) { if (hashed == EmptyHashedRelation) {
streamedIter streamedIter
} else if (hashed == EmptyHashedRelationWithAllNullKeys) { } else if (hashed == HashedRelationWithAllNullKeys) {
Iterator.empty Iterator.empty
} else { } else {
val keyGenerator = UnsafeProjection.create( val keyGenerator = UnsafeProjection.create(
@ -228,7 +228,6 @@ case class BroadcastHashJoinExec(
if (isNullAwareAntiJoin) { if (isNullAwareAntiJoin) {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
val (matched, _, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows") val numOutput = metricTerm(ctx, "numOutputRows")
if (broadcastRelation.value == EmptyHashedRelation) { if (broadcastRelation.value == EmptyHashedRelation) {
@ -237,26 +236,15 @@ case class BroadcastHashJoinExec(
|$numOutput.add(1); |$numOutput.add(1);
|${consume(ctx, input)} |${consume(ctx, input)}
""".stripMargin """.stripMargin
} else if (broadcastRelation.value == EmptyHashedRelationWithAllNullKeys) { } else if (broadcastRelation.value == HashedRelationWithAllNullKeys) {
s""" s"""
|// If the right side contains any all-null key, NAAJ simply returns Nothing. |// If the right side contains any all-null key, NAAJ simply returns Nothing.
""".stripMargin """.stripMargin
} else { } else {
val found = ctx.freshName("found")
s""" s"""
|boolean $found = false;
|// generate join key for stream side |// generate join key for stream side
|${keyEv.code} |${keyEv.code}
|if ($anyNull) { |if (!$anyNull && $relationTerm.getValue(${keyEv.value}) == null) {
| $found = true;
|} else {
| UnsafeRow $matched = (UnsafeRow)$relationTerm.getValue(${keyEv.value});
| if ($matched != null) {
| $found = true;
| }
|}
|
|if (!$found) {
| $numOutput.add(1); | $numOutput.add(1);
| ${consume(ctx, input)} | ${consume(ctx, input)}
|} |}

View file

@ -472,7 +472,7 @@ private[joins] object UnsafeHashedRelation {
// scalastyle:on throwerror // scalastyle:on throwerror
} }
} else if (isNullAware) { } else if (isNullAware) {
return EmptyHashedRelationWithAllNullKeys return HashedRelationWithAllNullKeys
} }
} }
@ -1056,7 +1056,7 @@ private[joins] object LongHashedRelation {
val key = rowKey.getLong(0) val key = rowKey.getLong(0)
map.append(key, unsafeRow) map.append(key, unsafeRow)
} else if (isNullAware) { } else if (isNullAware) {
return EmptyHashedRelationWithAllNullKeys return HashedRelationWithAllNullKeys
} }
} }
map.optimize() map.optimize()
@ -1067,7 +1067,7 @@ private[joins] object LongHashedRelation {
/** /**
* Common trait with dummy implementation for NAAJ special HashedRelation * Common trait with dummy implementation for NAAJ special HashedRelation
* EmptyHashedRelation * EmptyHashedRelation
* EmptyHashedRelationWithAllNullKeys * HashedRelationWithAllNullKeys
*/ */
trait NullAwareHashedRelation extends HashedRelation with Externalizable { trait NullAwareHashedRelation extends HashedRelation with Externalizable {
override def get(key: InternalRow): Iterator[InternalRow] = { override def get(key: InternalRow): Iterator[InternalRow] = {
@ -1130,8 +1130,8 @@ object EmptyHashedRelation extends NullAwareHashedRelation {
* A special HashedRelation indicates it built from a non-empty input:Iterator[InternalRow], * A special HashedRelation indicates it built from a non-empty input:Iterator[InternalRow],
* which contains all null columns key. * which contains all null columns key.
*/ */
object EmptyHashedRelationWithAllNullKeys extends NullAwareHashedRelation { object HashedRelationWithAllNullKeys extends NullAwareHashedRelation {
override def asReadOnlyCopy(): EmptyHashedRelationWithAllNullKeys.type = this override def asReadOnlyCopy(): HashedRelationWithAllNullKeys.type = this
} }
/** The HashedRelationBroadcastMode requires that rows are broadcasted as a HashedRelation. */ /** The HashedRelationBroadcastMode requires that rows are broadcasted as a HashedRelation. */

View file

@ -1168,7 +1168,7 @@ class AdaptiveQueryExecSuite
} }
} }
test("SPARK-32573: Eliminate NAAJ when BuildSide is EmptyHashedRelationWithAllNullKeys") { test("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") {
withSQLConf( withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {