[SPARK-21165][SQL] FileFormatWriter should handle mismatched attribute ids between logical and physical plan

## What changes were proposed in this pull request?

Due to optimizer removing some unnecessary aliases, the logical and physical plan may have different output attribute ids. FileFormatWriter should handle this when creating the physical sort node.

## How was this patch tested?

new regression test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19483 from cloud-fan/bug2.
This commit is contained in:
Wenchen Fan 2017-10-13 13:09:35 +08:00
parent 3ff766f61a
commit ec122209fb
3 changed files with 29 additions and 2 deletions

View file

@ -180,8 +180,13 @@ object FileFormatWriter extends Logging {
val rdd = if (orderingMatched) {
queryExecution.toRdd
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = requiredOrdering
.map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns))
SortExec(
requiredOrdering.map(SortOrder(_, Ascending)),
orderingExpr,
global = false,
child = queryExecution.executedPlan).execute()
}

View file

@ -32,7 +32,7 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
}
}
test("FileFormatWriter should respect the input query schema") {
test("SPARK-22252: FileFormatWriter should respect the input query schema") {
withTable("t1", "t2", "t3", "t4") {
spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2")

View file

@ -728,4 +728,26 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
assert(e.contains("mismatched input 'ROW'"))
}
}
test("SPARK-21165: FileFormatWriter should only rely on attributes from analyzed plan") {
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
withTable("tab1", "tab2") {
Seq(("a", "b", 3)).toDF("word", "first", "length").write.saveAsTable("tab1")
spark.sql(
"""
|CREATE TABLE tab2 (word string, length int)
|PARTITIONED BY (first string)
""".stripMargin)
spark.sql(
"""
|INSERT INTO TABLE tab2 PARTITION(first)
|SELECT word, length, cast(first as string) as first FROM tab1
""".stripMargin)
checkAnswer(spark.table("tab2"), Row("a", 3, "b"))
}
}
}
}