From 33334e2728f8d2e4cf7d542049435b589ed05a5e Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 14 Feb 2019 08:25:33 -0800 Subject: [PATCH] [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs. ## What changes were proposed in this pull request? Updates FileFormatWriter to create a consistent Hadoop Job ID for a write. ## How was this patch tested? Existing tests for regressions. Closes #23777 from rdblue/SPARK-26873-fix-file-format-writer-job-ids. Authored-by: Ryan Blue Signed-off-by: Marcelo Vanzin --- .../spark/sql/execution/datasources/FileFormatWriter.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index a8d24ceaf6..a9de64908c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -162,12 +162,14 @@ object FileFormatWriter extends Logging { rdd } + val jobIdInstant = new Date().getTime val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length) sparkSession.sparkContext.runJob( rddWithNonEmptyPartitions, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, + jobIdInstant = jobIdInstant, sparkStageId = taskContext.stageId(), sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, @@ -200,13 +202,14 @@ object FileFormatWriter extends Logging { /** Writes data out in a single Spark task. */ private def executeTask( description: WriteJobDescription, + jobIdInstant: Long, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[InternalRow]): WriteTaskResult = { - val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId) + val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)