[SPARK-29069][SQL] ResolveInsertInto should not do table lookup

### What changes were proposed in this pull request?

It's more clear to only do table lookup in `ResolveTables` rule (for v2 tables) and `ResolveRelations` rule (for v1 tables). `ResolveInsertInto` should only resolve the `InsertIntoStatement` with resolved relations.

### Why are the changes needed?

to make the code simpler

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

existing tests

Closes #25774 from cloud-fan/simplify.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Wenchen Fan 2019-09-16 09:46:34 +09:00 committed by HyukjinKwon
parent 7d4eb38bbc
commit 1b99d0cca4

View file

@ -671,6 +671,15 @@ class Analyzer(
case scala.Right(tableOpt) => tableOpt
}
v2TableOpt.map(DataSourceV2Relation.create).getOrElse(u)
case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match {
case scala.Left((_, _, tableOpt)) => tableOpt
case scala.Right(tableOpt) => tableOpt
}
v2TableOpt.map(DataSourceV2Relation.create).map { v2Relation =>
i.copy(table = v2Relation)
}.getOrElse(i)
}
}
@ -785,41 +794,28 @@ class Analyzer(
object ResolveInsertInto extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
lookupV2Relation(u.multipartIdentifier) match {
case scala.Left((_, _, Some(v2Table: Table))) =>
resolveV2Insert(i, v2Table)
case scala.Right(Some(v2Table: Table)) =>
resolveV2Insert(i, v2Table)
case _ =>
i
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _) if i.query.resolved =>
// ifPartitionNotExists is append with validation, but validation is not supported
if (i.ifPartitionNotExists) {
throw new AnalysisException(
s"Cannot write, IF NOT EXISTS is not supported for table: ${r.table.name}")
}
}
private def resolveV2Insert(i: InsertIntoStatement, table: Table): LogicalPlan = {
val relation = DataSourceV2Relation.create(table)
// ifPartitionNotExists is append with validation, but validation is not supported
if (i.ifPartitionNotExists) {
throw new AnalysisException(
s"Cannot write, IF NOT EXISTS is not supported for table: ${relation.table.name}")
}
val partCols = partitionColumnNames(r.table)
validatePartitionSpec(partCols, i.partitionSpec)
val partCols = partitionColumnNames(relation.table)
validatePartitionSpec(partCols, i.partitionSpec)
val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
val query = addStaticPartitionColumns(r, i.query, staticPartitions)
val dynamicPartitionOverwrite = partCols.size > staticPartitions.size &&
conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
val query = addStaticPartitionColumns(relation, i.query, staticPartitions)
val dynamicPartitionOverwrite = partCols.size > staticPartitions.size &&
conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
if (!i.overwrite) {
AppendData.byPosition(relation, query)
} else if (dynamicPartitionOverwrite) {
OverwritePartitionsDynamic.byPosition(relation, query)
} else {
OverwriteByExpression.byPosition(
relation, query, staticDeleteExpression(relation, staticPartitions))
}
if (!i.overwrite) {
AppendData.byPosition(r, query)
} else if (dynamicPartitionOverwrite) {
OverwritePartitionsDynamic.byPosition(r, query)
} else {
OverwriteByExpression.byPosition(r, query, staticDeleteExpression(r, staticPartitions))
}
}
private def partitionColumnNames(table: Table): Seq[String] = {