[SPARK-23523][SQL] Fix the incorrect result caused by the rule OptimizeMetadataOnlyQuery

## What changes were proposed in this pull request?
```Scala
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
 Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
 .write.json(tablePath.getCanonicalPath)
 val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
 df.show()
```

It generates a wrong result.
```
[c,e,a]
```

We have a bug in the rule `OptimizeMetadataOnlyQuery `. We should respect the attribute order in the original leaf node. This PR is to fix it.

## How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20684 from gatorsmile/optimizeMetadataOnly.
This commit is contained in:
gatorsmile 2018-02-27 08:44:25 -08:00
parent eac0b06722
commit 414ee867ba
4 changed files with 40 additions and 6 deletions

View file

@ -43,10 +43,11 @@ object LocalRelation {
} }
} }
case class LocalRelation(output: Seq[Attribute], case class LocalRelation(
data: Seq[InternalRow] = Nil, output: Seq[Attribute],
// Indicates whether this relation has data from a streaming source. data: Seq[InternalRow] = Nil,
override val isStreaming: Boolean = false) // Indicates whether this relation has data from a streaming source.
override val isStreaming: Boolean = false)
extends LeafNode with analysis.MultiInstanceRelation { extends LeafNode with analysis.MultiInstanceRelation {
// A local relation must have resolved output. // A local relation must have resolved output.

View file

@ -17,6 +17,9 @@
package org.apache.spark.sql.execution package org.apache.spark.sql.execution
import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
@ -80,8 +83,13 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
private def getPartitionAttrs( private def getPartitionAttrs(
partitionColumnNames: Seq[String], partitionColumnNames: Seq[String],
relation: LogicalPlan): Seq[Attribute] = { relation: LogicalPlan): Seq[Attribute] = {
val partColumns = partitionColumnNames.map(_.toLowerCase).toSet val attrMap = relation.output.map(_.name.toLowerCase(Locale.ROOT)).zip(relation.output).toMap
relation.output.filter(a => partColumns.contains(a.name.toLowerCase)) partitionColumnNames.map { colName =>
attrMap.getOrElse(colName.toLowerCase(Locale.ROOT),
throw new AnalysisException(s"Unable to find the column `$colName` " +
s"given [${relation.output.map(_.name).mkString(", ")}]")
)
}
} }
/** /**

View file

@ -67,6 +67,9 @@ case class HadoopFsRelation(
} }
} }
// When data schema and partition schema have the overlapped columns, the output
// schema respects the order of data schema for the overlapped columns, but respect
// the data types of partition schema
val schema: StructType = { val schema: StructType = {
StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++ StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f)))) partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))

View file

@ -17,9 +17,12 @@
package org.apache.spark.sql.execution package org.apache.spark.sql.execution
import java.io.File
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SharedSQLContext
class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext { class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
@ -125,4 +128,23 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
sql("SELECT COUNT(DISTINCT p) FROM t_1000").collect() sql("SELECT COUNT(DISTINCT p) FROM t_1000").collect()
} }
} }
test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") {
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
withTempPath { path =>
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
.write.json(tablePath.getCanonicalPath)
val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
checkAnswer(df, Row("a", "e", "c"))
val localRelation = df.queryExecution.optimizedPlan.collectFirst {
case l: LocalRelation => l
}
assert(localRelation.nonEmpty, "expect to see a LocalRelation")
assert(localRelation.get.output.map(_.name) == Seq("cOl3", "cOl1", "cOl5"))
}
}
}
} }