From 5287eec5a6948c0c6e0baaebf35f512324c0679a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 21 May 2015 14:33:11 -0700 Subject: [PATCH] [SPARK-7718] [SQL] Speed up partitioning by avoiding closure cleaning According to yhuai we spent 6-7 seconds cleaning closures in a partitioning job that takes 12 seconds. Since we provide these closures in Spark we know for sure they are serializable, so we can bypass the cleaning. Author: Andrew Or Closes #6256 from andrewor14/sql-partition-speed-up and squashes the following commits: a82b451 [Andrew Or] Fix style 10f7e3e [Andrew Or] Avoid getting call sites and cleaning closures 17e2943 [Andrew Or] Merge branch 'master' of github.com:apache/spark into sql-partition-speed-up 523f042 [Andrew Or] Skip unnecessary Utils.getCallSites too f7fe143 [Andrew Or] Avoid unnecessary closure cleaning --- .../scala/org/apache/spark/util/Utils.scala | 18 ++++ .../apache/spark/sql/parquet/newParquet.scala | 88 ++++++++++--------- .../sql/sources/DataSourceStrategy.scala | 18 +++- .../spark/sql/sources/SqlNewHadoopRDD.scala | 4 - 4 files changed, 78 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6a7d1fae33..b7a2473dfe 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2201,6 +2201,24 @@ private[spark] object Utils extends Logging { shutdownHooks.remove(ref) } + /** + * To avoid calling `Utils.getCallSite` for every single RDD we create in the body, + * set a dummy call site that RDDs use instead. This is for performance optimization. + */ + def withDummyCallSite[T](sc: SparkContext)(body: => T): T = { + val oldShortCallSite = sc.getLocalProperty(CallSite.SHORT_FORM) + val oldLongCallSite = sc.getLocalProperty(CallSite.LONG_FORM) + try { + sc.setLocalProperty(CallSite.SHORT_FORM, "") + sc.setLocalProperty(CallSite.LONG_FORM, "") + body + } finally { + // Restore the old ones here + sc.setLocalProperty(CallSite.SHORT_FORM, oldShortCallSite) + sc.setLocalProperty(CallSite.LONG_FORM, oldLongCallSite) + } + } + } private [util] class SparkShutdownHookManager { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 32986aa3ec..cb1e60883d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -33,6 +33,7 @@ import parquet.hadoop._ import parquet.hadoop.metadata.CompressionCodecName import parquet.hadoop.util.ContextUtil +import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD._ @@ -40,7 +41,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{Row, SQLConf, SQLContext} -import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException} +import org.apache.spark.util.Utils private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( @@ -264,57 +265,58 @@ private[sql] class ParquetRelation2( val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) - // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. - // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and - // footers. Especially when a global arbitrative schema (either from metastore or data source - // DDL) is available. - new SqlNewHadoopRDD( - sc = sqlContext.sparkContext, - broadcastedConf = broadcastedConf, - initDriverSideJobFuncOpt = Some(setInputPaths), - initLocalJobFuncOpt = Some(initLocalJobFuncOpt), - inputFormatClass = classOf[FilteringParquetRowInputFormat], - keyClass = classOf[Void], - valueClass = classOf[Row]) { + Utils.withDummyCallSite(sqlContext.sparkContext) { + // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. + // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects + // and footers. Especially when a global arbitrative schema (either from metastore or data + // source DDL) is available. + new SqlNewHadoopRDD( + sc = sqlContext.sparkContext, + broadcastedConf = broadcastedConf, + initDriverSideJobFuncOpt = Some(setInputPaths), + initLocalJobFuncOpt = Some(initLocalJobFuncOpt), + inputFormatClass = classOf[FilteringParquetRowInputFormat], + keyClass = classOf[Void], + valueClass = classOf[Row]) { - val cacheMetadata = useMetadataCache + val cacheMetadata = useMetadataCache - @transient val cachedStatuses = inputFiles.map { f => - // In order to encode the authority of a Path containing special characters such as /, - // we need to use the string returned by the URI of the path to create a new Path. - val pathWithAuthority = new Path(f.getPath.toUri.toString) + @transient val cachedStatuses = inputFiles.map { f => + // In order to encode the authority of a Path containing special characters such as /, + // we need to use the string returned by the URI of the path to create a new Path. + val pathWithAuthority = new Path(f.getPath.toUri.toString) - new FileStatus( - f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, - f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority) - }.toSeq + new FileStatus( + f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, + f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority) + }.toSeq - @transient val cachedFooters = footers.map { f => - // In order to encode the authority of a Path containing special characters such as /, - // we need to use the string returned by the URI of the path to create a new Path. - new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata) - }.toSeq + @transient val cachedFooters = footers.map { f => + // In order to encode the authority of a Path containing special characters such as /, + // we need to use the string returned by the URI of the path to create a new Path. + new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata) + }.toSeq - // Overridden so we can inject our own cached files statuses. - override def getPartitions: Array[SparkPartition] = { - val inputFormat = if (cacheMetadata) { - new FilteringParquetRowInputFormat { - override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses - - override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters + // Overridden so we can inject our own cached files statuses. + override def getPartitions: Array[SparkPartition] = { + val inputFormat = if (cacheMetadata) { + new FilteringParquetRowInputFormat { + override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses + override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters + } + } else { + new FilteringParquetRowInputFormat } - } else { - new FilteringParquetRowInputFormat - } - val jobContext = newJobContext(getConf(isDriverSide = true), jobId) - val rawSplits = inputFormat.getSplits(jobContext) + val jobContext = newJobContext(getConf(isDriverSide = true), jobId) + val rawSplits = inputFormat.getSplits(jobContext) - Array.tabulate[SparkPartition](rawSplits.size) { i => - new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + Array.tabulate[SparkPartition](rawSplits.size) { i => + new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } } - } - }.values + }.values + } } private class MetadataCache { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index 550090d22d..c03649d00b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.sources -import org.apache.spark.{SerializableWritable, Logging} +import org.apache.spark.{Logging, SerializableWritable, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{StringType, StructType, UTF8String} import org.apache.spark.sql.{SaveMode, Strategy, execution, sources} +import org.apache.spark.util.Utils /** * A Strategy for planning scans over data sources defined using the sources API. @@ -197,7 +198,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } } - dataRows.mapPartitions { iterator => + // Since we know for sure that this closure is serializable, we can avoid the overhead + // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally + // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718). + val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Row]) => { val dataTypes = requiredColumns.map(schema(_).dataType) val mutableRow = new SpecificMutableRow(dataTypes) iterator.map { dataRow => @@ -209,6 +213,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { mutableRow.asInstanceOf[expressions.Row] } } + + // This is an internal RDD whose call site the user should not be concerned with + // Since we create many of these (one per partition), the time spent on computing + // the call site may add up. + Utils.withDummyCallSite(dataRows.sparkContext) { + new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false) + } + } else { dataRows } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala index 0c7bb6e50c..a74a98631d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -75,10 +75,6 @@ private[sql] class SqlNewHadoopRDD[K, V]( with SparkHadoopMapReduceUtil with Logging { - if (initLocalJobFuncOpt.isDefined) { - sc.clean(initLocalJobFuncOpt.get) - } - protected def getJob(): Job = { val conf: Configuration = broadcastedConf.value.value // "new Job" will make a copy of the conf. Then, it is