[SPARK-7718] [SQL] Speed up partitioning by avoiding closure cleaning

According to yhuai we spent 6-7 seconds cleaning closures in a partitioning job that takes 12 seconds. Since we provide these closures in Spark we know for sure they are serializable, so we can bypass the cleaning.

Author: Andrew Or <andrew@databricks.com>

Closes #6256 from andrewor14/sql-partition-speed-up and squashes the following commits:

a82b451 [Andrew Or] Fix style
10f7e3e [Andrew Or] Avoid getting call sites and cleaning closures
17e2943 [Andrew Or] Merge branch 'master' of github.com:apache/spark into sql-partition-speed-up
523f042 [Andrew Or] Skip unnecessary Utils.getCallSites too
f7fe143 [Andrew Or] Avoid unnecessary closure cleaning
This commit is contained in:
Andrew Or 2015-05-21 14:33:11 -07:00 committed by Yin Huai
parent 6b18cdc1b1
commit 5287eec5a6
4 changed files with 78 additions and 50 deletions

View file

@ -2201,6 +2201,24 @@ private[spark] object Utils extends Logging {
shutdownHooks.remove(ref) shutdownHooks.remove(ref)
} }
/**
* To avoid calling `Utils.getCallSite` for every single RDD we create in the body,
* set a dummy call site that RDDs use instead. This is for performance optimization.
*/
def withDummyCallSite[T](sc: SparkContext)(body: => T): T = {
val oldShortCallSite = sc.getLocalProperty(CallSite.SHORT_FORM)
val oldLongCallSite = sc.getLocalProperty(CallSite.LONG_FORM)
try {
sc.setLocalProperty(CallSite.SHORT_FORM, "")
sc.setLocalProperty(CallSite.LONG_FORM, "")
body
} finally {
// Restore the old ones here
sc.setLocalProperty(CallSite.SHORT_FORM, oldShortCallSite)
sc.setLocalProperty(CallSite.LONG_FORM, oldLongCallSite)
}
}
} }
private [util] class SparkShutdownHookManager { private [util] class SparkShutdownHookManager {

View file

@ -33,6 +33,7 @@ import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil import parquet.hadoop.util.ContextUtil
import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException}
import org.apache.spark.broadcast.Broadcast import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD._ import org.apache.spark.rdd.RDD._
@ -40,7 +41,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext} import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException} import org.apache.spark.util.Utils
private[sql] class DefaultSource extends HadoopFsRelationProvider { private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation( override def createRelation(
@ -264,57 +265,58 @@ private[sql] class ParquetRelation2(
val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. Utils.withDummyCallSite(sqlContext.sparkContext) {
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// footers. Especially when a global arbitrative schema (either from metastore or data source // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects
// DDL) is available. // and footers. Especially when a global arbitrative schema (either from metastore or data
new SqlNewHadoopRDD( // source DDL) is available.
sc = sqlContext.sparkContext, new SqlNewHadoopRDD(
broadcastedConf = broadcastedConf, sc = sqlContext.sparkContext,
initDriverSideJobFuncOpt = Some(setInputPaths), broadcastedConf = broadcastedConf,
initLocalJobFuncOpt = Some(initLocalJobFuncOpt), initDriverSideJobFuncOpt = Some(setInputPaths),
inputFormatClass = classOf[FilteringParquetRowInputFormat], initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
keyClass = classOf[Void], inputFormatClass = classOf[FilteringParquetRowInputFormat],
valueClass = classOf[Row]) { keyClass = classOf[Void],
valueClass = classOf[Row]) {
val cacheMetadata = useMetadataCache val cacheMetadata = useMetadataCache
@transient val cachedStatuses = inputFiles.map { f => @transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as /, // In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path. // we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString) val pathWithAuthority = new Path(f.getPath.toUri.toString)
new FileStatus( new FileStatus(
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority) f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
}.toSeq }.toSeq
@transient val cachedFooters = footers.map { f => @transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /, // In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path. // we need to use the string returned by the URI of the path to create a new Path.
new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata) new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
}.toSeq }.toSeq
// Overridden so we can inject our own cached files statuses. // Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = { override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) { val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat { new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters }
} else {
new FilteringParquetRowInputFormat
} }
} else {
new FilteringParquetRowInputFormat
}
val jobContext = newJobContext(getConf(isDriverSide = true), jobId) val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
val rawSplits = inputFormat.getSplits(jobContext) val rawSplits = inputFormat.getSplits(jobContext)
Array.tabulate[SparkPartition](rawSplits.size) { i => Array.tabulate[SparkPartition](rawSplits.size) { i =>
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
} }
} }.values
}.values }
} }
private class MetadataCache { private class MetadataCache {

View file

@ -17,9 +17,9 @@
package org.apache.spark.sql.sources package org.apache.spark.sql.sources
import org.apache.spark.{SerializableWritable, Logging} import org.apache.spark.{Logging, SerializableWritable, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.{StringType, StructType, UTF8String} import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources} import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
import org.apache.spark.util.Utils
/** /**
* A Strategy for planning scans over data sources defined using the sources API. * A Strategy for planning scans over data sources defined using the sources API.
@ -197,7 +198,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
} }
} }
dataRows.mapPartitions { iterator => // Since we know for sure that this closure is serializable, we can avoid the overhead
// of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
// this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Row]) => {
val dataTypes = requiredColumns.map(schema(_).dataType) val dataTypes = requiredColumns.map(schema(_).dataType)
val mutableRow = new SpecificMutableRow(dataTypes) val mutableRow = new SpecificMutableRow(dataTypes)
iterator.map { dataRow => iterator.map { dataRow =>
@ -209,6 +213,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
mutableRow.asInstanceOf[expressions.Row] mutableRow.asInstanceOf[expressions.Row]
} }
} }
// This is an internal RDD whose call site the user should not be concerned with
// Since we create many of these (one per partition), the time spent on computing
// the call site may add up.
Utils.withDummyCallSite(dataRows.sparkContext) {
new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
}
} else { } else {
dataRows dataRows
} }

View file

@ -75,10 +75,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
with SparkHadoopMapReduceUtil with SparkHadoopMapReduceUtil
with Logging { with Logging {
if (initLocalJobFuncOpt.isDefined) {
sc.clean(initLocalJobFuncOpt.get)
}
protected def getJob(): Job = { protected def getJob(): Job = {
val conf: Configuration = broadcastedConf.value.value val conf: Configuration = broadcastedConf.value.value
// "new Job" will make a copy of the conf. Then, it is // "new Job" will make a copy of the conf. Then, it is