[SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.
## What changes were proposed in this pull request? Updates FileFormatWriter to create a consistent Hadoop Job ID for a write. ## How was this patch tested? Existing tests for regressions. Closes #23777 from rdblue/SPARK-26873-fix-file-format-writer-job-ids. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
parent
2228ee51ce
commit
33334e2728
|
@ -162,12 +162,14 @@ object FileFormatWriter extends Logging {
|
||||||
rdd
|
rdd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val jobIdInstant = new Date().getTime
|
||||||
val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
|
val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
|
||||||
sparkSession.sparkContext.runJob(
|
sparkSession.sparkContext.runJob(
|
||||||
rddWithNonEmptyPartitions,
|
rddWithNonEmptyPartitions,
|
||||||
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
|
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
|
||||||
executeTask(
|
executeTask(
|
||||||
description = description,
|
description = description,
|
||||||
|
jobIdInstant = jobIdInstant,
|
||||||
sparkStageId = taskContext.stageId(),
|
sparkStageId = taskContext.stageId(),
|
||||||
sparkPartitionId = taskContext.partitionId(),
|
sparkPartitionId = taskContext.partitionId(),
|
||||||
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
|
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
|
||||||
|
@ -200,13 +202,14 @@ object FileFormatWriter extends Logging {
|
||||||
/** Writes data out in a single Spark task. */
|
/** Writes data out in a single Spark task. */
|
||||||
private def executeTask(
|
private def executeTask(
|
||||||
description: WriteJobDescription,
|
description: WriteJobDescription,
|
||||||
|
jobIdInstant: Long,
|
||||||
sparkStageId: Int,
|
sparkStageId: Int,
|
||||||
sparkPartitionId: Int,
|
sparkPartitionId: Int,
|
||||||
sparkAttemptNumber: Int,
|
sparkAttemptNumber: Int,
|
||||||
committer: FileCommitProtocol,
|
committer: FileCommitProtocol,
|
||||||
iterator: Iterator[InternalRow]): WriteTaskResult = {
|
iterator: Iterator[InternalRow]): WriteTaskResult = {
|
||||||
|
|
||||||
val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
|
val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId)
|
||||||
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
|
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
|
||||||
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
|
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue