[SPARK-8890] [SQL] Fallback on sorting when writing many dynamic partitions

Previously, we would open a new file for each new dynamic written out using `HadoopFsRelation`.  For formats like parquet this is very costly due to the buffers required to get good compression.  In this PR I refactor the code allowing us to fall back on an external sort when many partitions are seen.  As such each task will open no more than `spark.sql.sources.maxFiles` files.  I also did the following cleanup:

 - Instead of keying the file HashMap on an expensive to compute string representation of the partition, we now use a fairly cheap UnsafeProjection that avoids heap allocations.
 - The control flow for instantiating and invoking a writer container has been simplified.  Now instead of switching in two places based on the use of partitioning, the specific writer container must implement a single method `writeRows` that is invoked using `runJob`.
 - `InternalOutputWriter` has been removed.  Instead we have a `private[sql]` method `writeInternal` that converts and calls the public method.  This method can be overridden by internal datasources to avoid the conversion.  This change remove a lot of code duplication and per-row `asInstanceOf` checks.
 - `commands.scala` has been split up.

Author: Michael Armbrust <michael@databricks.com>

Closes #8010 from marmbrus/fsWriting and squashes the following commits:

00804fe [Michael Armbrust] use shuffleMemoryManager.pageSizeBytes
775cc49 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into fsWriting
17b690e [Michael Armbrust] remove comment
40f0372 [Michael Armbrust] address comments
f5675bd [Michael Armbrust] char -> string
7e2d0a4 [Michael Armbrust] make sure we close current writer
8100100 [Michael Armbrust] delete empty commands.scala
71cc717 [Michael Armbrust] update comment
8ec75ac [Michael Armbrust] [SPARK-8890][SQL] Fallback on sorting when writing many dynamic partitions
This commit is contained in:
Michael Armbrust 2015-08-07 16:24:50 -07:00
parent 902334fd55
commit 49702bd738
10 changed files with 715 additions and 623 deletions

View file

@ -366,17 +366,21 @@ private[spark] object SQLConf {
"storing additional schema information in Hive's metastore.",
isPublic = false)
// Whether to perform partition discovery when loading external data sources. Default to true.
val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
defaultValue = Some(true),
doc = "When true, automtically discover data partitions.")
// Whether to perform partition column type inference. Default to true.
val PARTITION_COLUMN_TYPE_INFERENCE =
booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
defaultValue = Some(true),
doc = "When true, automatically infer the data types for partitioned columns.")
val PARTITION_MAX_FILES =
intConf("spark.sql.sources.maxConcurrentWrites",
defaultValue = Some(5),
doc = "The maximum number of concurent files to open before falling back on sorting when " +
"writing out files using dynamic partitioning.")
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
//

View file

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.io.IOException
import java.util.{Date, UUID}
import scala.collection.JavaConversions.asScalaIterator
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.{Utils, SerializableConfiguration}
/**
* Inserts the results of `query` in to a relation that extends [[InsertableRelation]].
*/
private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
query: LogicalPlan,
overwrite: Boolean)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = DataFrame(sqlContext, query)
// Apply the schema of the existing table to the new data.
val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)
// Invalidate the cache.
sqlContext.cacheManager.invalidateCache(logicalRelation)
Seq.empty[Row]
}
}

View file

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.io.IOException
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
import org.apache.spark.sql.sources._
import org.apache.spark.util.Utils
/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
* single write job, and owns a UUID that identifies this job. Each concrete implementation of
* [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
* each task output file. This UUID is passed to executor side via a property named
* `spark.sql.sources.writeJobUUID`.
*
* Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
* are used to write to normal tables and tables with dynamic partitions.
*
* Basic work flow of this command is:
*
* 1. Driver side setup, including output committer initialization and data source specific
* preparation work for the write job to be issued.
* 2. Issues a write job consists of one or more executor side tasks, each of which writes all
* rows within an RDD partition.
* 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
* exception is thrown during task commitment, also aborts that task.
* 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
* thrown during job commitment, also aborts the job.
*/
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
mode: SaveMode)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
require(
relation.paths.length == 1,
s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val outputPath = new Path(relation.paths.head)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val pathExists = fs.exists(qualifiedOutputPath)
val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
Utils.tryOrIOException {
if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
throw new IOException(s"Unable to clear output " +
s"directory $qualifiedOutputPath prior to writing to it")
}
}
true
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
case (SaveMode.Ignore, exists) =>
!exists
case (s, exists) =>
throw new IllegalStateException(s"unsupported save mode $s ($exists)")
}
// If we are appending data to an existing dir.
val isAppend = pathExists && (mode == SaveMode.Append)
if (doInsertion) {
val job = new Job(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
// A partitioned relation schema's can be different from the input logicalPlan, since
// partition columns are all moved after data column. We Project to adjust the ordering.
// TODO: this belongs in the analyzer.
val project = Project(
relation.schema.map(field => UnresolvedAttribute.quoted(field.name)), query)
val queryExecution = DataFrame(sqlContext, project).queryExecution
SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
val df = sqlContext.internalCreateDataFrame(queryExecution.toRdd, relation.schema)
val partitionColumns = relation.partitionColumns.fieldNames
// Some pre-flight checks.
require(
df.schema == relation.schema,
s"""DataFrame must have the same schema as the relation to which is inserted.
|DataFrame schema: ${df.schema}
|Relation schema: ${relation.schema}
""".stripMargin)
val partitionColumnsInSpec = relation.partitionColumns.fieldNames
require(
partitionColumnsInSpec.sameElements(partitionColumns),
s"""Partition columns mismatch.
|Expected: ${partitionColumnsInSpec.mkString(", ")}
|Actual: ${partitionColumns.mkString(", ")}
""".stripMargin)
val writerContainer = if (partitionColumns.isEmpty) {
new DefaultWriterContainer(relation, job, isAppend)
} else {
val output = df.queryExecution.executedPlan.output
val (partitionOutput, dataOutput) =
output.partition(a => partitionColumns.contains(a.name))
new DynamicPartitionWriterContainer(
relation,
job,
partitionOutput,
dataOutput,
output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
isAppend)
}
// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
writerContainer.driverSideSetup()
try {
sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writerContainer.writeRows _)
writerContainer.commitJob()
relation.refresh()
} catch { case cause: Throwable =>
logError("Aborting job.", cause)
writerContainer.abortJob()
throw new SparkException("Job aborted.", cause)
}
}
} else {
logInfo("Skipping insertion into a relation that already exists.")
}
Seq.empty[Row]
}
}

View file

@ -0,0 +1,404 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.util.{Date, UUID}
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter}
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.util.SerializableConfiguration
private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
@transient job: Job,
isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
with Serializable {
protected val dataSchema = relation.dataSchema
protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
// may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
// The reason why this ID is used to identify a job rather than a single task output file is
// that, speculative tasks must generate the same output file name as the original task.
private val uniqueWriteJobId = UUID.randomUUID()
// This is only used on driver side.
@transient private val jobContext: JobContext = job
// The following fields are initialized and used on both driver and executor side.
@transient protected var outputCommitter: OutputCommitter = _
@transient private var jobId: JobID = _
@transient private var taskId: TaskID = _
@transient private var taskAttemptId: TaskAttemptID = _
@transient protected var taskAttemptContext: TaskAttemptContext = _
protected val outputPath: String = {
assert(
relation.paths.length == 1,
s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
relation.paths.head
}
protected var outputWriterFactory: OutputWriterFactory = _
private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit
def driverSideSetup(): Unit = {
setupIDs(0, 0, 0)
setupConf()
// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
//
// Also, the `prepareJobForWrite` call must happen before initializing output format and output
// committer, since their initialization involve the job configuration, which can be potentially
// decorated in `prepareJobForWrite`.
outputWriterFactory = relation.prepareJobForWrite(job)
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupJob(jobContext)
}
def executorSideSetup(taskContext: TaskContext): Unit = {
setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupTask(taskAttemptContext)
}
protected def getWorkPath: String = {
outputCommitter match {
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
case _ => outputPath
}
}
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
if (isAppend) {
// If we are appending data to an existing dir, we will only use the output committer
// associated with the file output format since it is not safe to use a custom
// committer for appending. For example, in S3, direct parquet output committer may
// leave partial data in the destination dir when the the appending job fails.
logInfo(
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
} else {
val committerClass = context.getConfiguration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(outputPath), context)
} else {
// The specified output committer is just a OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
}
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
logInfo(
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
defaultOutputCommitter
}
}
}
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
}
private def setupConf(): Unit = {
serializableConf.value.set("mapred.job.id", jobId.toString)
serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
serializableConf.value.setBoolean("mapred.task.is.map", true)
serializableConf.value.setInt("mapred.task.partition", 0)
}
def commitTask(): Unit = {
SparkHadoopMapRedUtil.commitTask(
outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
}
def abortTask(): Unit = {
if (outputCommitter != null) {
outputCommitter.abortTask(taskAttemptContext)
}
logError(s"Task attempt $taskAttemptId aborted.")
}
def commitJob(): Unit = {
outputCommitter.commitJob(jobContext)
logInfo(s"Job $jobId committed.")
}
def abortJob(): Unit = {
if (outputCommitter != null) {
outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
}
logError(s"Job $jobId aborted.")
}
}
/**
* A writer that writes all of the rows in a partition to a single file.
*/
private[sql] class DefaultWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
executorSideSetup(taskContext)
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
writer.initConverter(dataSchema)
// If anything below fails, we should abort the task.
try {
while (iterator.hasNext) {
val internalRow = iterator.next()
writer.writeInternal(internalRow)
}
commitTask()
} catch {
case cause: Throwable =>
logError("Aborting task.", cause)
abortTask()
throw new SparkException("Task failed while writing rows.", cause)
}
def commitTask(): Unit = {
try {
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
super.commitTask()
} catch {
case cause: Throwable =>
// This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and
// will cause `abortTask()` to be invoked.
throw new RuntimeException("Failed to commit task", cause)
}
}
def abortTask(): Unit = {
try {
writer.close()
} finally {
super.abortTask()
}
}
}
}
/**
* A writer that dynamically opens files based on the given partition columns. Internally this is
* done by maintaining a HashMap of open files until `maxFiles` is reached. If this occurs, the
* writer externally sorts the remaining rows and then writes out them out one file at a time.
*/
private[sql] class DynamicPartitionWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Seq[Attribute],
dataColumns: Seq[Attribute],
inputSchema: Seq[Attribute],
defaultPartitionName: String,
maxOpenFiles: Int,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
executorSideSetup(taskContext)
// Returns the partition key given an input row
val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema)
// Returns the data columns to be written given an input row
val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
// Expressions that given a partition key build a string like: col1=val/col2=val/...
val partitionStringExpression = partitionColumns.zipWithIndex.flatMap { case (c, i) =>
val escaped =
ScalaUDF(
PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType))
val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
val partitionName = Literal(c.name + "=") :: str :: Nil
if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
}
// Returns the partition path given a partition key.
val getPartitionString =
UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns)
// If anything below fails, we should abort the task.
try {
// This will be filled in if we have to fall back on sorting.
var sorter: UnsafeKVExternalSorter = null
while (iterator.hasNext && sorter == null) {
val inputRow = iterator.next()
val currentKey = getPartitionKey(inputRow)
var currentWriter = outputWriters.get(currentKey)
if (currentWriter == null) {
if (outputWriters.size < maxOpenFiles) {
currentWriter = newOutputWriter(currentKey)
outputWriters.put(currentKey.copy(), currentWriter)
currentWriter.writeInternal(getOutputRow(inputRow))
} else {
logInfo(s"Maximum partitions reached, falling back on sorting.")
sorter = new UnsafeKVExternalSorter(
StructType.fromAttributes(partitionColumns),
StructType.fromAttributes(dataColumns),
SparkEnv.get.blockManager,
SparkEnv.get.shuffleMemoryManager,
SparkEnv.get.shuffleMemoryManager.pageSizeBytes)
sorter.insertKV(currentKey, getOutputRow(inputRow))
}
} else {
currentWriter.writeInternal(getOutputRow(inputRow))
}
}
// If the sorter is not null that means that we reached the maxFiles above and need to finish
// using external sort.
if (sorter != null) {
while (iterator.hasNext) {
val currentRow = iterator.next()
sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow))
}
logInfo(s"Sorting complete. Writing out partition files one at a time.")
val sortedIterator = sorter.sortedIterator()
var currentKey: InternalRow = null
var currentWriter: OutputWriter = null
try {
while (sortedIterator.next()) {
if (currentKey != sortedIterator.getKey) {
if (currentWriter != null) {
currentWriter.close()
}
currentKey = sortedIterator.getKey.copy()
logDebug(s"Writing partition: $currentKey")
// Either use an existing file from before, or open a new one.
currentWriter = outputWriters.remove(currentKey)
if (currentWriter == null) {
currentWriter = newOutputWriter(currentKey)
}
}
currentWriter.writeInternal(sortedIterator.getValue)
}
} finally {
if (currentWriter != null) { currentWriter.close() }
}
}
commitTask()
} catch {
case cause: Throwable =>
logError("Aborting task.", cause)
abortTask()
throw new SparkException("Task failed while writing rows.", cause)
}
/** Open and returns a new OutputWriter given a partition key. */
def newOutputWriter(key: InternalRow): OutputWriter = {
val partitionPath = getPartitionString(key).getString(0)
val path = new Path(getWorkPath, partitionPath)
taskAttemptContext.getConfiguration.set(
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
newWriter.initConverter(dataSchema)
newWriter
}
def clearOutputWriters(): Unit = {
outputWriters.asScala.values.foreach(_.close())
outputWriters.clear()
}
def commitTask(): Unit = {
try {
clearOutputWriters()
super.commitTask()
} catch {
case cause: Throwable =>
throw new RuntimeException("Failed to commit task", cause)
}
}
def abortTask(): Unit = {
try {
clearOutputWriters()
} finally {
super.abortTask()
}
}
}
}

View file

@ -1,606 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.io.IOException
import java.util.{Date, UUID}
import scala.collection.JavaConversions.asScalaIterator
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.{Utils, SerializableConfiguration}
private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
query: LogicalPlan,
overwrite: Boolean)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = DataFrame(sqlContext, query)
// Apply the schema of the existing table to the new data.
val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)
// Invalidate the cache.
sqlContext.cacheManager.invalidateCache(logicalRelation)
Seq.empty[Row]
}
}
/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
* single write job, and owns a UUID that identifies this job. Each concrete implementation of
* [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
* each task output file. This UUID is passed to executor side via a property named
* `spark.sql.sources.writeJobUUID`.
*
* Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
* are used to write to normal tables and tables with dynamic partitions.
*
* Basic work flow of this command is:
*
* 1. Driver side setup, including output committer initialization and data source specific
* preparation work for the write job to be issued.
* 2. Issues a write job consists of one or more executor side tasks, each of which writes all
* rows within an RDD partition.
* 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
* exception is thrown during task commitment, also aborts that task.
* 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
* thrown during job commitment, also aborts the job.
*/
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
mode: SaveMode)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
require(
relation.paths.length == 1,
s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val outputPath = new Path(relation.paths.head)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val pathExists = fs.exists(qualifiedOutputPath)
val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
Utils.tryOrIOException {
if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
throw new IOException(s"Unable to clear output " +
s"directory $qualifiedOutputPath prior to writing to it")
}
}
true
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
case (SaveMode.Ignore, exists) =>
!exists
case (s, exists) =>
throw new IllegalStateException(s"unsupported save mode $s ($exists)")
}
// If we are appending data to an existing dir.
val isAppend = pathExists && (mode == SaveMode.Append)
if (doInsertion) {
val job = new Job(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
// We create a DataFrame by applying the schema of relation to the data to make sure.
// We are writing data based on the expected schema,
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). We
// need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
// safely apply the schema of r.schema to the data.
val project = Project(
relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
val queryExecution = DataFrame(sqlContext, project).queryExecution
SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
val df = sqlContext.internalCreateDataFrame(queryExecution.toRdd, relation.schema)
val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
insert(new DefaultWriterContainer(relation, job, isAppend), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
}
Seq.empty[Row]
}
/**
* Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
*/
private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
// Uses local vals for serialization
val needsConversion = relation.needConversion
val dataSchema = relation.dataSchema
// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
writerContainer.driverSideSetup()
try {
df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
writerContainer.commitJob()
relation.refresh()
} catch { case cause: Throwable =>
logError("Aborting job.", cause)
writerContainer.abortJob()
throw new SparkException("Job aborted.", cause)
}
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
// If anything below fails, we should abort the task.
try {
writerContainer.executorSideSetup(taskContext)
if (needsConversion) {
val converter = CatalystTypeConverters.createToScalaConverter(dataSchema)
.asInstanceOf[InternalRow => Row]
while (iterator.hasNext) {
val internalRow = iterator.next()
writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
}
} else {
while (iterator.hasNext) {
val internalRow = iterator.next()
writerContainer.outputWriterForRow(internalRow)
.asInstanceOf[OutputWriterInternal].writeInternal(internalRow)
}
}
writerContainer.commitTask()
} catch { case cause: Throwable =>
logError("Aborting task.", cause)
writerContainer.abortTask()
throw new SparkException("Task failed while writing rows.", cause)
}
}
}
/**
* Inserts the content of the [[DataFrame]] into a table with partitioning columns.
*/
private def insertWithDynamicPartitions(
sqlContext: SQLContext,
writerContainer: BaseWriterContainer,
df: DataFrame,
partitionColumns: Array[String]): Unit = {
// Uses a local val for serialization
val needsConversion = relation.needConversion
val dataSchema = relation.dataSchema
require(
df.schema == relation.schema,
s"""DataFrame must have the same schema as the relation to which is inserted.
|DataFrame schema: ${df.schema}
|Relation schema: ${relation.schema}
""".stripMargin)
val partitionColumnsInSpec = relation.partitionColumns.fieldNames
require(
partitionColumnsInSpec.sameElements(partitionColumns),
s"""Partition columns mismatch.
|Expected: ${partitionColumnsInSpec.mkString(", ")}
|Actual: ${partitionColumns.mkString(", ")}
""".stripMargin)
val output = df.queryExecution.executedPlan.output
val (partitionOutput, dataOutput) = output.partition(a => partitionColumns.contains(a.name))
val codegenEnabled = df.sqlContext.conf.codegenEnabled
// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
writerContainer.driverSideSetup()
try {
df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
writerContainer.commitJob()
relation.refresh()
} catch { case cause: Throwable =>
logError("Aborting job.", cause)
writerContainer.abortJob()
throw new SparkException("Job aborted.", cause)
}
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
// If anything below fails, we should abort the task.
try {
writerContainer.executorSideSetup(taskContext)
// Projects all partition columns and casts them to strings to build partition directories.
val partitionCasts = partitionOutput.map(Cast(_, StringType))
val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
val dataProj = newProjection(codegenEnabled, dataOutput, output)
if (needsConversion) {
val converter = CatalystTypeConverters.createToScalaConverter(dataSchema)
.asInstanceOf[InternalRow => Row]
while (iterator.hasNext) {
val internalRow = iterator.next()
val partitionPart = partitionProj(internalRow)
val dataPart = converter(dataProj(internalRow))
writerContainer.outputWriterForRow(partitionPart).write(dataPart)
}
} else {
while (iterator.hasNext) {
val internalRow = iterator.next()
val partitionPart = partitionProj(internalRow)
val dataPart = dataProj(internalRow)
writerContainer.outputWriterForRow(partitionPart)
.asInstanceOf[OutputWriterInternal].writeInternal(dataPart)
}
}
writerContainer.commitTask()
} catch { case cause: Throwable =>
logError("Aborting task.", cause)
writerContainer.abortTask()
throw new SparkException("Task failed while writing rows.", cause)
}
}
}
// This is copied from SparkPlan, probably should move this to a more general place.
private def newProjection(
codegenEnabled: Boolean,
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): Projection = {
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if (codegenEnabled) {
try {
GenerateProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (sys.props.contains("spark.testing")) {
throw e
} else {
log.error("failed to generate projection, fallback to interpreted", e)
new InterpretedProjection(expressions, inputSchema)
}
}
} else {
new InterpretedProjection(expressions, inputSchema)
}
}
}
private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
@transient job: Job,
isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
with Serializable {
protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
// may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
// The reason why this ID is used to identify a job rather than a single task output file is
// that, speculative tasks must generate the same output file name as the original task.
private val uniqueWriteJobId = UUID.randomUUID()
// This is only used on driver side.
@transient private val jobContext: JobContext = job
// The following fields are initialized and used on both driver and executor side.
@transient protected var outputCommitter: OutputCommitter = _
@transient private var jobId: JobID = _
@transient private var taskId: TaskID = _
@transient private var taskAttemptId: TaskAttemptID = _
@transient protected var taskAttemptContext: TaskAttemptContext = _
protected val outputPath: String = {
assert(
relation.paths.length == 1,
s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
relation.paths.head
}
protected val dataSchema = relation.dataSchema
protected var outputWriterFactory: OutputWriterFactory = _
private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
def driverSideSetup(): Unit = {
setupIDs(0, 0, 0)
setupConf()
// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
//
// Also, the `prepareJobForWrite` call must happen before initializing output format and output
// committer, since their initialization involve the job configuration, which can be potentially
// decorated in `prepareJobForWrite`.
outputWriterFactory = relation.prepareJobForWrite(job)
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupJob(jobContext)
}
def executorSideSetup(taskContext: TaskContext): Unit = {
setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupTask(taskAttemptContext)
initWriters()
}
protected def getWorkPath: String = {
outputCommitter match {
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
case _ => outputPath
}
}
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
if (isAppend) {
// If we are appending data to an existing dir, we will only use the output committer
// associated with the file output format since it is not safe to use a custom
// committer for appending. For example, in S3, direct parquet output committer may
// leave partial data in the destination dir when the the appending job fails.
logInfo(
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
} else {
val committerClass = context.getConfiguration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(outputPath), context)
} else {
// The specified output committer is just a OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
}
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
logInfo(
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
defaultOutputCommitter
}
}
}
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
}
private def setupConf(): Unit = {
serializableConf.value.set("mapred.job.id", jobId.toString)
serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
serializableConf.value.setBoolean("mapred.task.is.map", true)
serializableConf.value.setInt("mapred.task.partition", 0)
}
// Called on executor side when writing rows
def outputWriterForRow(row: InternalRow): OutputWriter
protected def initWriters(): Unit
def commitTask(): Unit = {
SparkHadoopMapRedUtil.commitTask(
outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
}
def abortTask(): Unit = {
if (outputCommitter != null) {
outputCommitter.abortTask(taskAttemptContext)
}
logError(s"Task attempt $taskAttemptId aborted.")
}
def commitJob(): Unit = {
outputCommitter.commitJob(jobContext)
logInfo(s"Job $jobId committed.")
}
def abortJob(): Unit = {
if (outputCommitter != null) {
outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
}
logError(s"Job $jobId aborted.")
}
}
private[sql] class DefaultWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
@transient private var writer: OutputWriter = _
override protected def initWriters(): Unit = {
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
}
override def outputWriterForRow(row: InternalRow): OutputWriter = writer
override def commitTask(): Unit = {
try {
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
super.commitTask()
} catch { case cause: Throwable =>
// This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
// cause `abortTask()` to be invoked.
throw new RuntimeException("Failed to commit task", cause)
}
}
override def abortTask(): Unit = {
try {
// It's possible that the task fails before `writer` gets initialized
if (writer != null) {
writer.close()
}
} finally {
super.abortTask()
}
}
}
private[sql] class DynamicPartitionWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
defaultPartitionName: String,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
// All output writers are created on executor side.
@transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
override protected def initWriters(): Unit = {
outputWriters = new java.util.HashMap[String, OutputWriter]
}
// The `row` argument is supposed to only contain partition column values which have been casted
// to strings.
override def outputWriterForRow(row: InternalRow): OutputWriter = {
val partitionPath = {
val partitionPathBuilder = new StringBuilder
var i = 0
while (i < partitionColumns.length) {
val col = partitionColumns(i)
val partitionValueString = {
val string = row.getUTF8String(i)
if (string.eq(null)) {
defaultPartitionName
} else {
PartitioningUtils.escapePathName(string.toString)
}
}
if (i > 0) {
partitionPathBuilder.append(Path.SEPARATOR_CHAR)
}
partitionPathBuilder.append(s"$col=$partitionValueString")
i += 1
}
partitionPathBuilder.toString()
}
val writer = outputWriters.get(partitionPath)
if (writer.eq(null)) {
val path = new Path(getWorkPath, partitionPath)
taskAttemptContext.getConfiguration.set(
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
outputWriters.put(partitionPath, newWriter)
newWriter
} else {
writer
}
}
private def clearOutputWriters(): Unit = {
if (!outputWriters.isEmpty) {
asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
outputWriters.clear()
}
}
override def commitTask(): Unit = {
try {
clearOutputWriters()
super.commitTask()
} catch { case cause: Throwable =>
throw new RuntimeException("Failed to commit task", cause)
}
}
override def abortTask(): Unit = {
try {
clearOutputWriters()
} finally {
super.abortTask()
}
}
}

View file

@ -152,7 +152,7 @@ private[json] class JsonOutputWriter(
path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriterInternal with SparkHadoopMapRedUtil with Logging {
extends OutputWriter with SparkHadoopMapRedUtil with Logging {
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
@ -170,7 +170,9 @@ private[json] class JsonOutputWriter(
}.getRecordWriter(context)
}
override def writeInternal(row: InternalRow): Unit = {
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = {
JacksonGenerator(dataSchema, gen, row)
gen.flush()

View file

@ -62,7 +62,7 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
extends OutputWriterInternal {
extends OutputWriter {
private val recordWriter: RecordWriter[Void, InternalRow] = {
val outputFormat = {
@ -87,7 +87,9 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
outputFormat.getRecordWriter(context)
}
override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
override def close(): Unit = recordWriter.close(context)
}

View file

@ -342,18 +342,17 @@ abstract class OutputWriter {
* @since 1.4.0
*/
def close(): Unit
}
/**
* This is an internal, private version of [[OutputWriter]] with an writeInternal method that
* accepts an [[InternalRow]] rather than an [[Row]]. Data sources that return this must have
* the conversion flag set to false.
*/
private[sql] abstract class OutputWriterInternal extends OutputWriter {
private var converter: InternalRow => Row = _
override def write(row: Row): Unit = throw new UnsupportedOperationException
protected[sql] def initConverter(dataSchema: StructType) = {
converter =
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
}
def writeInternal(row: InternalRow): Unit
protected[sql] def writeInternal(row: InternalRow): Unit = {
write(converter(row))
}
}
/**

View file

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources
import org.apache.spark.sql.{Row, QueryTest}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.util.Utils
class PartitionedWriteSuite extends QueryTest {
import TestSQLContext.implicits._
test("write many partitions") {
val path = Utils.createTempDir()
path.delete()
val df = TestSQLContext.range(100).select($"id", lit(1).as("data"))
df.write.partitionBy("id").save(path.getCanonicalPath)
checkAnswer(
TestSQLContext.read.load(path.getCanonicalPath),
(0 to 99).map(Row(1, _)).toSeq)
Utils.deleteRecursively(path)
}
test("write many partitions with repeats") {
val path = Utils.createTempDir()
path.delete()
val base = TestSQLContext.range(100)
val df = base.unionAll(base).select($"id", lit(1).as("data"))
df.write.partitionBy("id").save(path.getCanonicalPath)
checkAnswer(
TestSQLContext.read.load(path.getCanonicalPath),
(0 to 99).map(Row(1, _)).toSeq ++ (0 to 99).map(Row(1, _)).toSeq)
Utils.deleteRecursively(path)
}
}

View file

@ -66,7 +66,7 @@ private[orc] class OrcOutputWriter(
path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriterInternal with SparkHadoopMapRedUtil with HiveInspectors {
extends OutputWriter with SparkHadoopMapRedUtil with HiveInspectors {
private val serializer = {
val table = new Properties()
@ -120,7 +120,9 @@ private[orc] class OrcOutputWriter(
).asInstanceOf[RecordWriter[NullWritable, Writable]]
}
override def writeInternal(row: InternalRow): Unit = {
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = {
var i = 0
while (i < row.numFields) {
reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType))