[SPARK-8014] [SQL] Avoid premature metadata discovery when writing a HadoopFsRelation with a save mode other than Append
The current code references the schema of the DataFrame to be written before checking save mode. This triggers expensive metadata discovery prematurely. For save mode other than `Append`, this metadata discovery is useless since we either ignore the result (for `Ignore` and `ErrorIfExists`) or delete existing files (for `Overwrite`) later.
This PR fixes this issue by deferring metadata discovery after save mode checking.
Author: Cheng Lian <lian@databricks.com>
Closes #6583 from liancheng/spark-8014 and squashes the following commits:
1aafabd [Cheng Lian] Updates comments
088abaa [Cheng Lian] Avoids schema merging and partition discovery when data schema and partition schema are defined
8fbd93f [Cheng Lian] Fixes SPARK-8014
(cherry picked from commit 686a45f0b9
)
Signed-off-by: Yin Huai <yhuai@databricks.com>
This commit is contained in:
parent
815e056542
commit
cbaf595447
|
@ -190,7 +190,7 @@ private[sql] class ParquetRelation2(
|
|||
}
|
||||
}
|
||||
|
||||
override def dataSchema: StructType = metadataCache.dataSchema
|
||||
override def dataSchema: StructType = maybeDataSchema.getOrElse(metadataCache.dataSchema)
|
||||
|
||||
override private[sql] def refresh(): Unit = {
|
||||
super.refresh()
|
||||
|
|
|
@ -30,9 +30,10 @@ import org.apache.spark._
|
|||
import org.apache.spark.mapred.SparkHadoopMapRedUtil
|
||||
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
|
||||
import org.apache.spark.sql.catalyst.CatalystTypeConverters
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
|
||||
import org.apache.spark.sql.execution.RunnableCommand
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
|
||||
|
@ -94,10 +95,19 @@ private[sql] case class InsertIntoHadoopFsRelation(
|
|||
|
||||
// We create a DataFrame by applying the schema of relation to the data to make sure.
|
||||
// We are writing data based on the expected schema,
|
||||
val df = sqlContext.createDataFrame(
|
||||
DataFrame(sqlContext, query).queryExecution.toRdd,
|
||||
relation.schema,
|
||||
needsConversion = false)
|
||||
val df = {
|
||||
// For partitioned relation r, r.schema's column ordering can be different from the column
|
||||
// ordering of data.logicalPlan (partition columns are all moved after data column). We
|
||||
// need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
|
||||
// safely apply the schema of r.schema to the data.
|
||||
val project = Project(
|
||||
relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
|
||||
|
||||
sqlContext.createDataFrame(
|
||||
DataFrame(sqlContext, project).queryExecution.toRdd,
|
||||
relation.schema,
|
||||
needsConversion = false)
|
||||
}
|
||||
|
||||
val partitionColumns = relation.partitionColumns.fieldNames
|
||||
if (partitionColumns.isEmpty) {
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
|
|||
import org.apache.spark.Logging
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
|
||||
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.execution.RunnableCommand
|
||||
|
@ -322,19 +322,13 @@ private[sql] object ResolvedDataSource {
|
|||
Some(partitionColumnsSchema(data.schema, partitionColumns)),
|
||||
caseInsensitiveOptions)
|
||||
|
||||
// For partitioned relation r, r.schema's column ordering is different with the column
|
||||
// ordering of data.logicalPlan. We need a Project to adjust the ordering.
|
||||
// So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
|
||||
// the data.
|
||||
val project =
|
||||
Project(
|
||||
r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
|
||||
data.logicalPlan)
|
||||
|
||||
// For partitioned relation r, r.schema's column ordering can be different from the column
|
||||
// ordering of data.logicalPlan (partition columns are all moved after data column). This
|
||||
// will be adjusted within InsertIntoHadoopFsRelation.
|
||||
sqlContext.executePlan(
|
||||
InsertIntoHadoopFsRelation(
|
||||
r,
|
||||
project,
|
||||
data.logicalPlan,
|
||||
mode)).toRdd
|
||||
r
|
||||
case _ =>
|
||||
|
|
|
@ -503,7 +503,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
|
|||
*/
|
||||
override lazy val schema: StructType = {
|
||||
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
|
||||
StructType(dataSchema ++ partitionSpec.partitionColumns.filterNot { column =>
|
||||
StructType(dataSchema ++ partitionColumns.filterNot { column =>
|
||||
dataSchemaColumnNames.contains(column.name.toLowerCase)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
|
||||
package org.apache.spark.sql.sources
|
||||
|
||||
import java.io.File
|
||||
|
||||
import com.google.common.io.Files
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
|
@ -454,6 +457,20 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
|
||||
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
|
||||
|
||||
df.write
|
||||
.format(dataSourceName)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.partitionBy("c", "a")
|
||||
.saveAsTable("t")
|
||||
|
||||
withTable("t") {
|
||||
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
|
||||
|
@ -535,20 +552,6 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
|
||||
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
|
||||
|
||||
df.write
|
||||
.format("parquet")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.partitionBy("c", "a")
|
||||
.saveAsTable("t")
|
||||
|
||||
withTable("t") {
|
||||
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-7868: _temporary directories should be ignored") {
|
||||
withTempPath { dir =>
|
||||
val df = Seq("a", "b", "c").zipWithIndex.toDF()
|
||||
|
@ -564,4 +567,32 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
|
|||
checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-8014: Avoid scanning output directory when SaveMode isn't SaveMode.Append") {
|
||||
withTempDir { dir =>
|
||||
val path = dir.getCanonicalPath
|
||||
val df = Seq(1 -> "a").toDF()
|
||||
|
||||
// Creates an arbitrary file. If this directory gets scanned, ParquetRelation2 will throw
|
||||
// since it's not a valid Parquet file.
|
||||
val emptyFile = new File(path, "empty")
|
||||
Files.createParentDirs(emptyFile)
|
||||
Files.touch(emptyFile)
|
||||
|
||||
// This shouldn't throw anything.
|
||||
df.write.format("parquet").mode(SaveMode.Ignore).save(path)
|
||||
|
||||
// This should only complain that the destination directory already exists, rather than file
|
||||
// "empty" is not a Parquet file.
|
||||
assert {
|
||||
intercept[RuntimeException] {
|
||||
df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
|
||||
}.getMessage.contains("already exists")
|
||||
}
|
||||
|
||||
// This shouldn't throw anything.
|
||||
df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
|
||||
checkAnswer(read.format("parquet").load(path), df)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue