diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index c9de45e0dd..e049d54bf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} -import org.apache.spark.{Logging, SparkException, Partition => SparkPartition} +import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( @@ -60,50 +60,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext extends OutputWriter { private val recordWriter: RecordWriter[Void, InternalRow] = { - val conf = context.getConfiguration val outputFormat = { - // When appending new Parquet files to an existing Parquet file directory, to avoid - // overwriting existing data files, we need to find out the max task ID encoded in these data - // file names. - // TODO Make this snippet a utility function for other data source developers - val maxExistingTaskId = { - // Note that `path` may point to a temporary location. Here we retrieve the real - // destination path from the configuration - val outputPath = new Path(conf.get("spark.sql.sources.output.path")) - val fs = outputPath.getFileSystem(conf) - - if (fs.exists(outputPath)) { - // Pattern used to match task ID in part file names, e.g.: - // - // part-r-00001.gz.parquet - // ^~~~~ - val partFilePattern = """part-.-(\d{1,}).*""".r - - fs.listStatus(outputPath).map(_.getPath.getName).map { - case partFilePattern(id) => id.toInt - case name if name.startsWith("_") => 0 - case name if name.startsWith(".") => 0 - case name => throw new AnalysisException( - s"Trying to write Parquet files to directory $outputPath, " + - s"but found items with illegal name '$name'.") - }.reduceOption(_ max _).getOrElse(0) - } else { - 0 - } - } - new ParquetOutputFormat[InternalRow]() { // Here we override `getDefaultWorkFile` for two reasons: // - // 1. To allow appending. We need to generate output file name based on the max available - // task ID computed above. + // 1. To allow appending. We need to generate unique output file names to avoid + // overwriting existing files (either exist before the write job, or are just written + // by other tasks within the same write job). // // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all // partitions in the case of dynamic partitioning. override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1 - new Path(path, f"part-r-$split%05d$extension") + val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") + val split = context.getTaskAttemptID.getTaskID.getId + new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index c16bd9ae52..215e53c020 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.sources -import java.util.Date +import java.util.{Date, UUID} import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter} -import org.apache.parquet.hadoop.util.ContextUtil +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} import org.apache.spark._ import org.apache.spark.mapred.SparkHadoopMapRedUtil @@ -59,6 +58,28 @@ private[sql] case class InsertIntoDataSource( } } +/** + * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. + * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a + * single write job, and owns a UUID that identifies this job. Each concrete implementation of + * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for + * each task output file. This UUID is passed to executor side via a property named + * `spark.sql.sources.writeJobUUID`. + * + * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]] + * are used to write to normal tables and tables with dynamic partitions. + * + * Basic work flow of this command is: + * + * 1. Driver side setup, including output committer initialization and data source specific + * preparation work for the write job to be issued. + * 2. Issues a write job consists of one or more executor side tasks, each of which writes all + * rows within an RDD partition. + * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any + * exception is thrown during task commitment, also aborts that task. + * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is + * thrown during job commitment, also aborts the job. + */ private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, @@ -261,7 +282,14 @@ private[sql] abstract class BaseWriterContainer( with Logging with Serializable { - protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job)) + protected val serializableConf = new SerializableConfiguration(job.getConfiguration) + + // This UUID is used to avoid output file name collision between different appending write jobs. + // These jobs may belong to different SparkContext instances. Concrete data source implementations + // may use this UUID to generate unique file names (e.g., `part-r--.parquet`). + // The reason why this ID is used to identify a job rather than a single task output file is + // that, speculative tasks must generate the same output file name as the original task. + private val uniqueWriteJobId = UUID.randomUUID() // This is only used on driver side. @transient private val jobContext: JobContext = job @@ -290,6 +318,11 @@ private[sql] abstract class BaseWriterContainer( setupIDs(0, 0, 0) setupConf() + // This UUID is sent to executor side together with the serialized `Configuration` object within + // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate + // unique task output files. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) + // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext. @@ -417,15 +450,16 @@ private[sql] class DefaultWriterContainer( assert(writer != null, "OutputWriter instance should have been initialized") writer.close() super.commitTask() - } catch { - case cause: Throwable => - super.abortTask() - throw new RuntimeException("Failed to commit task", cause) + } catch { case cause: Throwable => + // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will + // cause `abortTask()` to be invoked. + throw new RuntimeException("Failed to commit task", cause) } } override def abortTask(): Unit = { try { + // It's possible that the task fails before `writer` gets initialized if (writer != null) { writer.close() } @@ -469,21 +503,25 @@ private[sql] class DynamicPartitionWriterContainer( }) } - override def commitTask(): Unit = { - try { + private def clearOutputWriters(): Unit = { + if (outputWriters.nonEmpty) { outputWriters.values.foreach(_.close()) outputWriters.clear() + } + } + + override def commitTask(): Unit = { + try { + clearOutputWriters() super.commitTask() } catch { case cause: Throwable => - super.abortTask() throw new RuntimeException("Failed to commit task", cause) } } override def abortTask(): Unit = { try { - outputWriters.values.foreach(_.close()) - outputWriters.clear() + clearOutputWriters() } finally { super.abortTask() } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 1e51173a19..e3ab9442b4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -27,13 +27,13 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types.StructType -private[orc] object OrcFileOperator extends Logging{ +private[orc] object OrcFileOperator extends Logging { def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = { val conf = config.getOrElse(new Configuration) val fspath = new Path(pathStr) val fs = fspath.getFileSystem(conf) val orcFiles = listOrcFiles(pathStr, conf) - + logDebug(s"Creating ORC Reader from ${orcFiles.head}") // TODO Need to consider all files when schema evolution is taken into account. OrcFile.createReader(fs, orcFiles.head) } @@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{ val reader = getFileReader(path, conf) val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] val schema = readerInspector.getTypeName + logDebug(s"Reading schema from file $path, got Hive schema string: $schema") HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType] } @@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{ def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) - val path = origPath.makeQualified(fs) + val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath) .filterNot(_.isDir) .map(_.getPath) .filterNot(_.getName.startsWith("_")) .filterNot(_.getName.startsWith(".")) - if (paths == null || paths.size == 0) { + if (paths == null || paths.isEmpty) { throw new IllegalArgumentException( s"orcFileOperator: path $path does not have valid orc files matching the pattern") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index dbce39f21d..705f48f1cd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, Reco import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.{HadoopRDD, RDD} @@ -39,7 +40,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.{Logging} import org.apache.spark.util.SerializableConfiguration /* Implicit conversions */ @@ -105,8 +105,9 @@ private[orc] class OrcOutputWriter( recordWriterInstantiated = true val conf = context.getConfiguration + val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID") val partition = context.getTaskAttemptID.getTaskID.getId - val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc" + val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc" new OrcOutputFormat().getRecordWriter( new Path(path, filename).getFileSystem(conf), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index f901bd8171..ea325cc93c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._ object TestHive extends TestHiveContext( new SparkContext( - System.getProperty("spark.sql.test.master", "local[2]"), + System.getProperty("spark.sql.test.master", "local[32]"), "TestSQLContext", new SparkConf() .set("spark.sql.test", "") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 82e08caf46..a0cdd0db42 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -43,8 +43,14 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { orcTableDir.mkdir() import org.apache.spark.sql.hive.test.TestHive.implicits._ + // Originally we were using a 10-row RDD for testing. However, when default parallelism is + // greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions, + // which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and + // causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501 + // for more details. To workaround this issue before fixing SPARK-8501, we simply increase row + // number in this RDD to avoid empty partitions. sparkContext - .makeRDD(1 to 10) + .makeRDD(1 to 100) .map(i => OrcData(i, s"part-$i")) .toDF() .registerTempTable(s"orc_temp_table") @@ -70,35 +76,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { } test("create temporary orc table") { - checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10)) + checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100)) checkAnswer( sql("SELECT * FROM normal_orc_source"), - (1 to 10).map(i => Row(i, s"part-$i"))) + (1 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT * FROM normal_orc_source where intField > 5"), - (6 to 10).map(i => Row(i, s"part-$i"))) + (6 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"), - (1 to 10).map(i => Row(1, s"part-$i"))) + (1 to 100).map(i => Row(1, s"part-$i"))) } test("create temporary orc table as") { - checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10)) + checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100)) checkAnswer( sql("SELECT * FROM normal_orc_source"), - (1 to 10).map(i => Row(i, s"part-$i"))) + (1 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT * FROM normal_orc_source WHERE intField > 5"), - (6 to 10).map(i => Row(i, s"part-$i"))) + (6 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"), - (1 to 10).map(i => Row(1, s"part-$i"))) + (1 to 100).map(i => Row(1, s"part-$i"))) } test("appending insert") { @@ -106,7 +112,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("SELECT * FROM normal_orc_source"), - (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i => + (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i => Seq.fill(2)(Row(i, s"part-$i")) }) } @@ -119,7 +125,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("SELECT * FROM normal_orc_as_source"), - (6 to 10).map(i => Row(i, s"part-$i"))) + (6 to 100).map(i => Row(i, s"part-$i"))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 0f959b3d0b..5d7cd16c12 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW numberFormat.setGroupingUsed(false) override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") val split = context.getTaskAttemptID.getTaskID.getId val name = FileOutputFormat.getOutputName(context) - new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}") + new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId") } } @@ -156,6 +157,7 @@ class CommitFailureTestRelation( context: TaskAttemptContext): OutputWriter = { new SimpleTextOutputWriter(path, context) { override def close(): Unit = { + super.close() sys.error("Intentional task commitment failure for testing purpose.") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 76469d7a3d..e0d8277a8e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -35,7 +35,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { import sqlContext.sql import sqlContext.implicits._ - val dataSourceName = classOf[SimpleTextSource].getCanonicalName + val dataSourceName: String val dataSchema = StructType( @@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { checkAnswer(sqlContext.table("t"), df.select('b, 'c, 'a).collect()) } } + + // NOTE: This test suite is not super deterministic. On nodes with only relatively few cores + // (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or + // more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this + // requirement. We probably want to move this test case to spark-integration-tests or spark-perf + // later. + test("SPARK-8406: Avoids name collision while writing Parquet files") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext + .range(10000) + .repartition(250) + .write + .mode(SaveMode.Overwrite) + .format(dataSourceName) + .save(path) + + assertResult(10000) { + sqlContext + .read + .format(dataSourceName) + .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json) + .load(path) + .count() + } + } + } } class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { @@ -502,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { } class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils { - import TestHive.implicits._ - override val sqlContext = TestHive + // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose. val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName test("SPARK-7684: commitTask() failure should fallback to abortTask()") { withTempPath { file => - val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b") + // Here we coalesce partition number to 1 to ensure that only a single task is issued. This + // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary` + // directory while committing/aborting the job. See SPARK-8513 for more details. + val df = sqlContext.range(0, 10).coalesce(1) intercept[SparkException] { df.write.format(dataSourceName).save(file.getCanonicalPath) }