diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 274a8edb0c..63310be22c 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1466,37 +1466,6 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext` support. - - spark.sql.parquet.output.committer.class - org.apache.parquet.hadoop.
ParquetOutputCommitter
- -

- The output committer class used by Parquet. The specified class needs to be a subclass of - org.apache.hadoop.
mapreduce.OutputCommitter
. Typically, it's also a - subclass of org.apache.parquet.hadoop.ParquetOutputCommitter. -

-

- Note: -

-

-

- Spark SQL comes with a builtin - org.apache.spark.sql.
parquet.DirectParquetOutputCommitter
, which can be more - efficient then the default Parquet output committer when writing data to S3. -

- - spark.sql.parquet.mergeSchema false @@ -2165,8 +2134,6 @@ options. - In the `sql` dialect, floating point numbers are now parsed as decimal. HiveQL parsing remains unchanged. - The canonical name of SQL/DataFrame functions are now lower case (e.g. sum vs SUM). - - It has been determined that using the DirectOutputCommitter when speculation is enabled is unsafe - and thus this output committer will not be used when speculation is on, independent of configuration. - JSON data source will not automatically load new files that are created by other applications (i.e. files that are not inserted to the dataset through Spark SQL). For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 233ac263aa..f6b7f0854b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -129,16 +129,17 @@ private[sql] abstract class BaseWriterContainer( outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext) } catch { case e: org.apache.hadoop.fs.FileAlreadyExistsException => - if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) { - // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry + if (outputCommitter.getClass.getName.contains("Direct")) { + // SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry // attempts, the task will fail because the output file is created from a prior attempt. // This often means the most visible error to the user is misleading. Augment the error // to tell the user to look for the actual error. throw new SparkException("The output file already exists but this could be due to a " + "failure from an earlier attempt. Look through the earlier logs or stage page for " + - "the first error.\n File exists error: " + e) + "the first error.\n File exists error: " + e.getLocalizedMessage, e) + } else { + throw e } - throw e } } @@ -156,15 +157,6 @@ private[sql] abstract class BaseWriterContainer( s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + "for appending.") defaultOutputCommitter - } else if (speculationEnabled) { - // When speculation is enabled, it's not safe to use customized output committer classes, - // especially direct output committers (e.g. `DirectParquetOutputCommitter`). - // - // See SPARK-9899 for more details. - logInfo( - s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + - "because spark.speculation is configured to be true.") - defaultOutputCommitter } else { val configuration = context.getConfiguration val committerClass = configuration.getClass( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala deleted file mode 100644 index ecadb9e7c6..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter -import org.apache.parquet.Log -import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat} -import org.apache.parquet.hadoop.util.ContextUtil - -/** - * An output committer for writing Parquet files. In stead of writing to the `_temporary` folder - * like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the - * destination folder. This can be useful for data stored in S3, where directory operations are - * relatively expensive. - * - * To enable this output committer, users may set the "spark.sql.parquet.output.committer.class" - * property via Hadoop [[Configuration]]. Not that this property overrides - * "spark.sql.sources.outputCommitterClass". - * - * *NOTE* - * - * NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's - * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are - * left empty). - */ -private[datasources] class DirectParquetOutputCommitter( - outputPath: Path, context: TaskAttemptContext) - extends ParquetOutputCommitter(outputPath, context) { - val LOG = Log.getLog(classOf[ParquetOutputCommitter]) - - override def getWorkPath: Path = outputPath - override def abortTask(taskContext: TaskAttemptContext): Unit = {} - override def commitTask(taskContext: TaskAttemptContext): Unit = {} - override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true - override def setupJob(jobContext: JobContext): Unit = {} - override def setupTask(taskContext: TaskAttemptContext): Unit = {} - - override def commitJob(jobContext: JobContext) { - val configuration = ContextUtil.getConfiguration(jobContext) - val fileSystem = outputPath.getFileSystem(configuration) - - if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) { - try { - val outputStatus = fileSystem.getFileStatus(outputPath) - val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus) - try { - ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers) - } catch { case e: Exception => - LOG.warn("could not write summary file for " + outputPath, e) - val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE) - if (fileSystem.exists(metadataPath)) { - fileSystem.delete(metadataPath, true) - } - } - } catch { - case e: Exception => LOG.warn("could not write summary file for " + outputPath, e) - } - } - - if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { - try { - val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) - fileSystem.create(successPath).close() - } catch { - case e: Exception => LOG.warn("could not write success file for " + outputPath, e) - } - } - } -} - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index a2fd8da782..5ad95e4b9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -76,13 +76,6 @@ private[sql] class DefaultSource val conf = ContextUtil.getConfiguration(job) - // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible - val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) - if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") { - conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, - classOf[DirectParquetOutputCommitter].getCanonicalName) - } - val committerClass = conf.getClass( SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index a3017258d6..581095d3dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -445,55 +445,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - testQuietly("SPARK-6352 DirectParquetOutputCommitter") { - val clonedConf = new Configuration(hadoopConfiguration) - - // Write to a parquet file and let it fail. - // _temporary should be missing if direct output committer works. - try { - hadoopConfiguration.set("spark.sql.parquet.output.committer.class", - classOf[DirectParquetOutputCommitter].getCanonicalName) - sqlContext.udf.register("div0", (x: Int) => x / 0) - withTempPath { dir => - intercept[org.apache.spark.SparkException] { - sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath) - } - val path = new Path(dir.getCanonicalPath, "_temporary") - val fs = path.getFileSystem(hadoopConfiguration) - assert(!fs.exists(path)) - } - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - } - } - - testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatibility") { - val clonedConf = new Configuration(hadoopConfiguration) - - // Write to a parquet file and let it fail. - // _temporary should be missing if direct output committer works. - try { - hadoopConfiguration.set("spark.sql.parquet.output.committer.class", - "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") - sqlContext.udf.register("div0", (x: Int) => x / 0) - withTempPath { dir => - intercept[org.apache.spark.SparkException] { - sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath) - } - val path = new Path(dir.getCanonicalPath, "_temporary") - val fs = path.getFileSystem(hadoopConfiguration) - assert(!fs.exists(path)) - } - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - } - } - - test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { withTempPath { dir => val clonedConf = new Configuration(hadoopConfiguration) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index ea7e905742..10eeb30242 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -668,40 +668,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t") } } - - test("SPARK-9899 Disable customized output committer when speculation is on") { - val clonedConf = new Configuration(hadoopConfiguration) - val speculationEnabled = - sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) - - try { - withTempPath { dir => - // Enables task speculation - sqlContext.sparkContext.conf.set("spark.speculation", "true") - - // Uses a customized output committer which always fails - hadoopConfiguration.set( - SQLConf.OUTPUT_COMMITTER_CLASS.key, - classOf[AlwaysFailOutputCommitter].getName) - - // Code below shouldn't throw since customized output committer should be disabled. - val df = sqlContext.range(10).toDF().coalesce(1) - df.write.format(dataSourceName).save(dir.getCanonicalPath) - checkAnswer( - sqlContext - .read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .load(dir.getCanonicalPath), - df) - } - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) - } - } } // This class is used to test SPARK-8578. We should not use any custom output committer when