Commit task outputs to Hadoop-supported storage systems in parallel on the

cluster instead of on the master. Fixes #110.
This commit is contained in:
Matei Zaharia 2012-06-06 16:46:53 -07:00
parent 6ae2746d1e
commit 048276799a
2 changed files with 11 additions and 14 deletions

View file

@ -12,8 +12,8 @@ import java.io.IOException
import java.net.URI
import java.util.Date
import spark.SerializableWritable
import spark.Logging
import spark.SerializableWritable
/**
* Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should also
@ -84,26 +84,23 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl
writer.close(Reporter.NULL)
}
def commit(): Boolean = {
var result = false
def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
result = true
} catch {
case e:IOException => {
logError ("Error committing the output of task: " + taID.value)
e.printStackTrace()
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
}
return result
}
logWarning ("No need to commit output of task: " + taID.value)
return true
} else {
logWarning ("No need to commit output of task: " + taID.value)
}
}
def cleanup() {

View file

@ -335,7 +335,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val writer = new HadoopWriter(conf)
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K,V)]): HadoopWriter = {
def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) {
writer.setup(context.stageId, context.splitId, context.attemptId)
writer.open()
@ -347,10 +347,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
writer.close()
return writer
writer.commit()
}
self.context.runJob(self, writeToFile _ ).foreach(_.commit())
self.context.runJob(self, writeToFile _)
writer.cleanup()
}