[SPARK-8139] [SQL] Updates docs and comments of data sources and Parquet output committer options

This PR only applies to master branch (1.5.0-SNAPSHOT) since it references `org.apache.parquet` classes which only appear in Parquet 1.7.0.

Author: Cheng Lian <lian@databricks.com>

Closes #6683 from liancheng/output-committer-docs and squashes the following commits:

b4648b8 [Cheng Lian] Removes spark.sql.sources.outputCommitterClass as it's not a public option
ee63923 [Cheng Lian] Updates docs and comments of data sources and Parquet output committer options
This commit is contained in:
Cheng Lian 2015-06-23 17:24:26 -07:00
parent 7fb5ae5024
commit 111d6b9b8a
4 changed files with 78 additions and 20 deletions

View file

@ -1348,6 +1348,34 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
support.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.output.committer.class</code></td>
<td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
<td>
<p>
The output committer class used by Parquet. The specified class needs to be a subclass of
<code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a
subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
</p>
<p>
<b>Note:</b>
<ul>
<li>
This option must be set via Hadoop <code>Configuration</code> rather than Spark
<code>SQLConf</code>.
</li>
<li>
This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
</li>
</ul>
</p>
<p>
Spark SQL comes with a builtin
<code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
efficient then the default Parquet output committer when writing data to S3.
</p>
</td>
</tr>
</table>
## JSON Datasets
@ -1876,7 +1904,7 @@ that these options will be deprecated in future release as more optimizations ar
Configures the number of partitions to use when shuffling data for joins or aggregations.
</td>
</tr>
<tr>
<tr>
<td><code>spark.sql.planner.externalSort</code></td>
<td>false</td>
<td>

View file

@ -22,6 +22,8 @@ import java.util.Properties
import scala.collection.immutable
import scala.collection.JavaConversions._
import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.sql.catalyst.CatalystConf
private[spark] object SQLConf {
@ -252,9 +254,9 @@ private[spark] object SQLConf {
val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
defaultValue = Some(false),
doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default" +
" because of a known bug in Paruet 1.6.0rc3 " +
"(<a href=\"https://issues.apache.org/jira/browse/PARQUET-136\">PARQUET-136</a>). However, " +
doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default " +
"because of a known bug in Parquet 1.6.0rc3 " +
"(PARQUET-136, https://issues.apache.org/jira/browse/PARQUET-136). However, " +
"if your table doesn't contain any nullable string or binary columns, it's still safe to " +
"turn this feature on.")
@ -262,11 +264,21 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")
val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
key = "spark.sql.parquet.output.committer.class",
defaultValue = Some(classOf[ParquetOutputCommitter].getName),
doc = "The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
"option must be set in Hadoop Configuration. 2. This option overrides " +
"\"spark.sql.sources.outputCommitterClass\"."
)
val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
defaultValue = Some(false),
doc = "<TODO>")
val HIVE_VERIFY_PARTITIONPATH = booleanConf("spark.sql.hive.verifyPartitionPath",
val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
defaultValue = Some(true),
doc = "<TODO>")
@ -325,9 +337,13 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")
// The output committer class used by FSBasedRelation. The specified class needs to be a
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
// NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
//
// NOTE:
//
// 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
// 2. This option can be overriden by "spark.sql.parquet.output.committer.class".
val OUTPUT_COMMITTER_CLASS =
stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
@ -415,7 +431,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
/** When true uses verifyPartitionPath to prune the path which is not exists. */
private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITIONPATH)
private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)

View file

@ -17,19 +17,35 @@
package org.apache.spark.sql.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.Log
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
/**
* An output committer for writing Parquet files. In stead of writing to the `_temporary` folder
* like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the
* destination folder. This can be useful for data stored in S3, where directory operations are
* relatively expensive.
*
* To enable this output committer, users may set the "spark.sql.parquet.output.committer.class"
* property via Hadoop [[Configuration]]. Not that this property overrides
* "spark.sql.sources.outputCommitterClass".
*
* *NOTE*
*
* NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
* left * empty).
*/
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
val LOG = Log.getLog(classOf[ParquetOutputCommitter])
override def getWorkPath(): Path = outputPath
override def getWorkPath: Path = outputPath
override def abortTask(taskContext: TaskAttemptContext): Unit = {}
override def commitTask(taskContext: TaskAttemptContext): Unit = {}
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
@ -46,13 +62,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
try {
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
} catch {
case e: Exception => {
LOG.warn("could not write summary file for " + outputPath, e)
val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
if (fileSystem.exists(metadataPath)) {
fileSystem.delete(metadataPath, true)
}
} catch { case e: Exception =>
LOG.warn("could not write summary file for " + outputPath, e)
val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
if (fileSystem.exists(metadataPath)) {
fileSystem.delete(metadataPath, true)
}
}
} catch {

View file

@ -178,11 +178,11 @@ private[sql] class ParquetRelation2(
val committerClass =
conf.getClass(
"spark.sql.parquet.output.committer.class",
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])
if (conf.get("spark.sql.parquet.output.committer.class") == null) {
if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
logInfo("Using default output committer for Parquet: " +
classOf[ParquetOutputCommitter].getCanonicalName)
} else {