[SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD commit protocol

I have modified SparkHadoopWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses  actual stageId to check whether a stage can be committed unlike before that  it was using executors' jobId to do this check.
In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix.

Author: Reza Safi <rezasafi@cloudera.com>

Closes #19848 from rezasafi/stagerddsimple.
This commit is contained in:
Reza Safi 2017-12-04 09:23:48 -08:00 committed by Marcelo Vanzin
parent 3927bb9b46
commit f81401e1cb
3 changed files with 53 additions and 8 deletions

View file

@ -60,17 +60,17 @@ object SparkHadoopWriter extends Logging {
config: HadoopWriteConfigUtil[K, V]): Unit = {
// Extract context and configuration from RDD.
val sparkContext = rdd.context
val stageId = rdd.id
val commitJobId = rdd.id
// Set up a job.
val jobTrackerId = createJobTrackerID(new Date())
val jobContext = config.createJobContext(jobTrackerId, stageId)
val jobContext = config.createJobContext(jobTrackerId, commitJobId)
config.initOutputFormat(jobContext)
// Assert the output format/key/value class is set in JobConf.
config.assertConf(jobContext, rdd.conf)
val committer = config.createCommitter(stageId)
val committer = config.createCommitter(commitJobId)
committer.setupJob(jobContext)
// Try to write all RDD partitions as a Hadoop OutputFormat.
@ -80,7 +80,7 @@ object SparkHadoopWriter extends Logging {
context = context,
config = config,
jobTrackerId = jobTrackerId,
sparkStageId = context.stageId,
commitJobId = commitJobId,
sparkPartitionId = context.partitionId,
sparkAttemptNumber = context.attemptNumber,
committer = committer,
@ -102,14 +102,14 @@ object SparkHadoopWriter extends Logging {
context: TaskContext,
config: HadoopWriteConfigUtil[K, V],
jobTrackerId: String,
sparkStageId: Int,
commitJobId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[(K, V)]): TaskCommitMessage = {
// Set up a task.
val taskContext = config.createTaskAttemptContext(
jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber)
jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
committer.setupTask(taskContext)
val (outputMetrics, callback) = initHadoopOutputMetrics(context)

View file

@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val taskAttemptNumber = TaskContext.get().attemptNumber()
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
val stageId = TaskContext.get().stageId()
val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
if (canCommit) {
performCommit()
@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination

View file

@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext => NewJobContext,
OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat,
RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext}
import org.apache.hadoop.util.Progressable
import org.scalatest.Assertions
import org.apache.spark._
import org.apache.spark.Partitioner
@ -524,6 +525,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
}
test("The JobId on the driver and executors should be the same during the commit") {
// Create more than one rdd to mimic stageId not equal to rddId
val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2)
.map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }
.filter { p => p._1 > 0 }
pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored")
assert(JobID.jobid != -1)
}
test("saveAsHadoopFile should respect configured output committers") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
val conf = new JobConf()
@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat {
}
}
class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
def setupJob(j: NewJobContext): Unit = {
JobID.jobid = j.getJobID().getId
}
def needsTaskCommit(t: NewTaskAttempContext): Boolean = false
def setupTask(t: NewTaskAttempContext): Unit = {
val jobId = t.getTaskAttemptID().getJobID().getId
assert(jobId === JobID.jobid)
}
def commitTask(t: NewTaskAttempContext): Unit = {}
def abortTask(t: NewTaskAttempContext): Unit = {}
}
class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {
def checkOutputSpecs(j: NewJobContext): Unit = {}
def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
new NewFakeWriter()
}
def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
new YetAnotherFakeCommitter()
}
}
object JobID {
var jobid = -1
}
class ConfigTestFormat() extends NewFakeFormat() with Configurable {
var setConfCalled = false