Improved HadoopFileWriter (saves key and value classes to jobconf)

This commit is contained in:
Tathagata Das 2011-06-23 08:11:22 -07:00
parent b5e6645505
commit 3d2befe831

View file

@ -31,7 +31,7 @@ extends HadoopFileWriter [NullWritable, Text, TextOutputFormat[NullWritable, Tex
@serializable
class HadoopFileWriter [K, V, F <: OutputFormat[K,V], C <: OutputCommitter]
(path: String, @transient jobConf: JobConf)
(implicit fm: ClassManifest[F], cm: ClassManifest[C]) {
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F], cm: ClassManifest[C]) {
private val now = new Date()
private val conf = new SerializableWritable[JobConf](if (jobConf == null) new JobConf() else jobConf)
@ -49,7 +49,7 @@ class HadoopFileWriter [K, V, F <: OutputFormat[K,V], C <: OutputCommitter]
@transient private var jobContext: JobContext = null
@transient private var taskContext: TaskAttemptContext = null
def this (path: String)(implicit fm: ClassManifest[F], cm: ClassManifest[C]) = this(path, null)
def this (path: String)(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F], cm: ClassManifest[C]) = this(path, null)
def preSetup() {
setIDs(0, 0, 0)
@ -78,9 +78,13 @@ class HadoopFileWriter [K, V, F <: OutputFormat[K,V], C <: OutputCommitter]
}
def write(key: K, value: V) {
if (writer!=null)
if (writer!=null) {
val key1 = key.asInstanceOf[Any]
val value1 = value.asInstanceOf[Any]
//println ("Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
println ("Writing ("+key.toString+ ", " + value.toString + ")")
writer.write(key, value)
else
} else
throw new IOException("Writer is null, open() has not been called")
}
@ -154,10 +158,12 @@ class HadoopFileWriter [K, V, F <: OutputFormat[K,V], C <: OutputCommitter]
if (!confProvided) {
conf.value.setOutputFormat(fm.erasure.asInstanceOf[Class[F]])
conf.value.setOutputCommitter(cm.erasure.asInstanceOf[Class[C]])
conf.value.setOutputKeyClass(km.erasure)
conf.value.setOutputValueClass(vm.erasure)
}
FileOutputFormat.setOutputPath(conf.value, HadoopFileWriter.createPathFromString(path, conf.value))
conf.value.set("mapred.job.id", jID.value.toString);
conf.value.set("mapred.tip.id", taID.value.getTaskID.toString);
conf.value.set("mapred.task.id", taID.value.toString);