[SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer.
Just a baby step towards making it more efficient. Author: Reynold Xin <rxin@databricks.com> Closes #7282 from rxin/SPARK-8888 and squashes the following commits: 3da51ae [Reynold Xin] [SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer.
This commit is contained in:
parent
0ba98c04c7
commit
f61c989b40
|
@ -19,8 +19,6 @@ package org.apache.spark.sql.sources
|
|||
|
||||
import java.util.{Date, UUID}
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.mapreduce._
|
||||
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
|
||||
|
@ -110,7 +108,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
|
|||
!exists
|
||||
}
|
||||
// If we are appending data to an existing dir.
|
||||
val isAppend = (pathExists) && (mode == SaveMode.Append)
|
||||
val isAppend = pathExists && (mode == SaveMode.Append)
|
||||
|
||||
if (doInsertion) {
|
||||
val job = new Job(hadoopConf)
|
||||
|
@ -142,9 +140,12 @@ private[sql] case class InsertIntoHadoopFsRelation(
|
|||
}
|
||||
}
|
||||
|
||||
Seq.empty[InternalRow]
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
|
||||
*/
|
||||
private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
|
||||
// Uses local vals for serialization
|
||||
val needsConversion = relation.needConversion
|
||||
|
@ -188,6 +189,9 @@ private[sql] case class InsertIntoHadoopFsRelation(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts the content of the [[DataFrame]] into a table with partitioning columns.
|
||||
*/
|
||||
private def insertWithDynamicPartitions(
|
||||
sqlContext: SQLContext,
|
||||
writerContainer: BaseWriterContainer,
|
||||
|
@ -497,13 +501,14 @@ private[sql] class DynamicPartitionWriterContainer(
|
|||
extends BaseWriterContainer(relation, job, isAppend) {
|
||||
|
||||
// All output writers are created on executor side.
|
||||
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _
|
||||
@transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
|
||||
|
||||
override protected def initWriters(): Unit = {
|
||||
outputWriters = mutable.Map.empty[String, OutputWriter]
|
||||
outputWriters = new java.util.HashMap[String, OutputWriter]
|
||||
}
|
||||
|
||||
override def outputWriterForRow(row: Row): OutputWriter = {
|
||||
// TODO (SPARK-8888): zip and all the stuff happening here is very inefficient.
|
||||
val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) =>
|
||||
val string = if (rawValue == null) null else String.valueOf(rawValue)
|
||||
val valueString = if (string == null || string.isEmpty) {
|
||||
|
@ -514,18 +519,23 @@ private[sql] class DynamicPartitionWriterContainer(
|
|||
s"/$col=$valueString"
|
||||
}.mkString.stripPrefix(Path.SEPARATOR)
|
||||
|
||||
outputWriters.getOrElseUpdate(partitionPath, {
|
||||
val writer = outputWriters.get(partitionPath)
|
||||
if (writer.eq(null)) {
|
||||
val path = new Path(getWorkPath, partitionPath)
|
||||
taskAttemptContext.getConfiguration.set(
|
||||
"spark.sql.sources.output.path",
|
||||
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path",
|
||||
new Path(outputPath, partitionPath).toString)
|
||||
outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
|
||||
})
|
||||
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
|
||||
outputWriters.put(partitionPath, newWriter)
|
||||
newWriter
|
||||
} else {
|
||||
writer
|
||||
}
|
||||
}
|
||||
|
||||
private def clearOutputWriters(): Unit = {
|
||||
if (outputWriters.nonEmpty) {
|
||||
outputWriters.values.foreach(_.close())
|
||||
if (!outputWriters.isEmpty) {
|
||||
val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator())
|
||||
iter.foreach(_.close())
|
||||
outputWriters.clear()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue