[SPARK-35972][SQL] When replace ExtractValue in NestedColumnAliasing we should use semanticEquals

### What changes were proposed in this pull request?
Ideally, in SQL query, nested columns should result to GetStructField with non-None name. But there are places that can create GetStructField with None name, such as UnresolvedStar.expand, Dataset encoder stuff, etc.
the current `nestedFieldToAlias` cannot catch it up and will cause job failed.

### Why are the changes needed?
Fix bug

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT,

Closes #33183 from AngersZhuuuu/SPARK-35972.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 87282f04bf)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Angerszhuuuu 2021-07-06 00:09:34 -07:00 committed by Liang-Chi Hsieh
parent 176b055c12
commit c2ef235419
2 changed files with 33 additions and 7 deletions

View file

@ -144,7 +144,8 @@ object NestedColumnAliasing {
attr -> evAliasSeq
}
val nestedFieldToAlias = attributeToExtractValuesAndAliases.values.flatten.toMap
val nestedFieldToAlias = attributeToExtractValuesAndAliases.values.flatten
.map { case (field, alias) => field.canonicalized -> alias }.toMap
// A reference attribute can have multiple aliases for nested fields.
val attrToAliases =
@ -167,10 +168,10 @@ object NestedColumnAliasing {
*/
def getNewProjectList(
projectList: Seq[NamedExpression],
nestedFieldToAlias: Map[ExtractValue, Alias]): Seq[NamedExpression] = {
nestedFieldToAlias: Map[Expression, Alias]): Seq[NamedExpression] = {
projectList.map(_.transform {
case f: ExtractValue if nestedFieldToAlias.contains(f) =>
nestedFieldToAlias(f).toAttribute
case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) =>
nestedFieldToAlias(f.canonicalized).toAttribute
}.asInstanceOf[NamedExpression])
}
@ -180,13 +181,13 @@ object NestedColumnAliasing {
*/
def replaceWithAliases(
plan: LogicalPlan,
nestedFieldToAlias: Map[ExtractValue, Alias],
nestedFieldToAlias: Map[Expression, Alias],
attrToAliases: AttributeMap[Seq[Alias]]): LogicalPlan = {
plan.withNewChildren(plan.children.map { plan =>
Project(plan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), plan)
}).transformExpressions {
case f: ExtractValue if nestedFieldToAlias.contains(f) =>
nestedFieldToAlias(f).toAttribute
case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) =>
nestedFieldToAlias(f.canonicalized).toAttribute
}
}

View file

@ -738,6 +738,31 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
}
test("SPARK-35972: NestedColumnAliasing should consider semantic equality") {
val dataType = new StructType()
.add(StructField("itemid", StringType))
.add(StructField("search_params", StructType(Seq(
StructField("col1", StringType),
StructField("col2", StringType)
))))
val relation = LocalRelation('struct_data.struct(dataType))
val plan = relation
.repartition(100)
.select(
GetStructField('struct_data, 1, None).as("value"),
$"struct_data.search_params.col1".as("col1"),
$"struct_data.search_params.col2".as("col2")).analyze
val query = Optimize.execute(plan)
val optimized = relation
.select(GetStructField('struct_data, 1, None).as("_extract_search_params"))
.repartition(100)
.select(
$"_extract_search_params".as("value"),
$"_extract_search_params.col1".as("col1"),
$"_extract_search_params.col2".as("col2")).analyze
comparePlans(optimized, query)
}
}
object NestedColumnAliasingSuite {