[SPARK-22018][SQL] Preserve top-level alias metadata when collapsing projects

## What changes were proposed in this pull request?
If there are two projects like as follows.
```
Project [a_with_metadata#27 AS b#26]
+- Project [a#0 AS a_with_metadata#27]
   +- LocalRelation <empty>, [a#0, b#1]
```
Child Project has an output column with a metadata in it, and the parent Project has an alias that implicitly forwards the metadata. So this metadata is visible for higher operators. Upon applying CollapseProject optimizer rule, the metadata is not preserved.
```
Project [a#0 AS b#26]
+- LocalRelation <empty>, [a#0, b#1]
```
This is incorrect, as downstream operators that expect certain metadata (e.g. watermark in structured streaming) to identify certain fields will fail to do so. This PR fixes it by preserving the metadata of top-level aliases.

## How was this patch tested?
New unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #19240 from tdas/SPARK-22018.
This commit is contained in:
Tathagata Das 2017-09-14 22:32:16 -07:00
parent a28728a9af
commit 88661747f5
2 changed files with 25 additions and 3 deletions

View file

@ -2256,7 +2256,10 @@ object CleanupAliases extends Rule[LogicalPlan] {
def trimNonTopLevelAliases(e: Expression): Expression = e match {
case a: Alias =>
a.withNewChildren(trimAliases(a.child) :: Nil)
a.copy(child = trimAliases(a.child))(
exprId = a.exprId,
qualifier = a.qualifier,
explicitMetadata = Some(a.metadata))
case other => trimAliases(other)
}

View file

@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Rand
import org.apache.spark.sql.catalyst.expressions.{Alias, Rand}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.MetadataBuilder
class CollapseProjectSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
@ -119,4 +120,22 @@ class CollapseProjectSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
test("preserve top-level alias metadata while collapsing projects") {
def hasMetadata(logicalPlan: LogicalPlan): Boolean = {
logicalPlan.asInstanceOf[Project].projectList.exists(_.metadata.contains("key"))
}
val metadata = new MetadataBuilder().putLong("key", 1).build()
val analyzed =
Project(Seq(Alias('a_with_metadata, "b")()),
Project(Seq(Alias('a, "a_with_metadata")(explicitMetadata = Some(metadata))),
testRelation.logicalPlan)).analyze
require(hasMetadata(analyzed))
val optimized = Optimize.execute(analyzed)
val projects = optimized.collect { case p: Project => p }
assert(projects.size === 1)
assert(hasMetadata(optimized))
}
}