SPARK-2038: rename "conf" parameters in the saveAsHadoop functions
to distinguish with SparkConf object https://issues.apache.org/jira/browse/SPARK-2038 Author: CodingCat <zhunansjtu@gmail.com> Closes #1087 from CodingCat/SPARK-2038 and squashes the following commits: 763975f [CodingCat] style fix d91288d [CodingCat] rename "conf" parameters in the saveAsHadoop functions
This commit is contained in:
parent
2794990e9e
commit
443f5e1bbc
|
@ -719,9 +719,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
keyClass: Class[_],
|
||||
valueClass: Class[_],
|
||||
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
|
||||
conf: Configuration = self.context.hadoopConfiguration)
|
||||
hadoopConf: Configuration = self.context.hadoopConfiguration)
|
||||
{
|
||||
val job = new NewAPIHadoopJob(conf)
|
||||
val job = new NewAPIHadoopJob(hadoopConf)
|
||||
job.setOutputKeyClass(keyClass)
|
||||
job.setOutputValueClass(valueClass)
|
||||
job.setOutputFormatClass(outputFormatClass)
|
||||
|
@ -752,24 +752,25 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
keyClass: Class[_],
|
||||
valueClass: Class[_],
|
||||
outputFormatClass: Class[_ <: OutputFormat[_, _]],
|
||||
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
|
||||
hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration),
|
||||
codec: Option[Class[_ <: CompressionCodec]] = None) {
|
||||
conf.setOutputKeyClass(keyClass)
|
||||
conf.setOutputValueClass(valueClass)
|
||||
hadoopConf.setOutputKeyClass(keyClass)
|
||||
hadoopConf.setOutputValueClass(valueClass)
|
||||
// Doesn't work in Scala 2.9 due to what may be a generics bug
|
||||
// TODO: Should we uncomment this for Scala 2.10?
|
||||
// conf.setOutputFormat(outputFormatClass)
|
||||
conf.set("mapred.output.format.class", outputFormatClass.getName)
|
||||
hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
|
||||
for (c <- codec) {
|
||||
conf.setCompressMapOutput(true)
|
||||
conf.set("mapred.output.compress", "true")
|
||||
conf.setMapOutputCompressorClass(c)
|
||||
conf.set("mapred.output.compression.codec", c.getCanonicalName)
|
||||
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
|
||||
hadoopConf.setCompressMapOutput(true)
|
||||
hadoopConf.set("mapred.output.compress", "true")
|
||||
hadoopConf.setMapOutputCompressorClass(c)
|
||||
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
|
||||
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
|
||||
}
|
||||
conf.setOutputCommitter(classOf[FileOutputCommitter])
|
||||
FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
|
||||
saveAsHadoopDataset(conf)
|
||||
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
|
||||
FileOutputFormat.setOutputPath(hadoopConf,
|
||||
SparkHadoopWriter.createPathFromString(path, hadoopConf))
|
||||
saveAsHadoopDataset(hadoopConf)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -778,8 +779,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
* output paths required (e.g. a table name to write to) in the same way as it would be
|
||||
* configured for a Hadoop MapReduce job.
|
||||
*/
|
||||
def saveAsNewAPIHadoopDataset(conf: Configuration) {
|
||||
val job = new NewAPIHadoopJob(conf)
|
||||
def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) {
|
||||
val job = new NewAPIHadoopJob(hadoopConf)
|
||||
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
|
||||
val jobtrackerID = formatter.format(new Date())
|
||||
val stageId = self.id
|
||||
|
@ -835,10 +836,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
|
||||
* MapReduce job.
|
||||
*/
|
||||
def saveAsHadoopDataset(conf: JobConf) {
|
||||
val outputFormatInstance = conf.getOutputFormat
|
||||
val keyClass = conf.getOutputKeyClass
|
||||
val valueClass = conf.getOutputValueClass
|
||||
def saveAsHadoopDataset(hadoopConf: JobConf) {
|
||||
val outputFormatInstance = hadoopConf.getOutputFormat
|
||||
val keyClass = hadoopConf.getOutputKeyClass
|
||||
val valueClass = hadoopConf.getOutputValueClass
|
||||
if (outputFormatInstance == null) {
|
||||
throw new SparkException("Output format class not set")
|
||||
}
|
||||
|
@ -848,18 +849,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
if (valueClass == null) {
|
||||
throw new SparkException("Output value class not set")
|
||||
}
|
||||
SparkHadoopUtil.get.addCredentials(conf)
|
||||
SparkHadoopUtil.get.addCredentials(hadoopConf)
|
||||
|
||||
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
|
||||
valueClass.getSimpleName + ")")
|
||||
|
||||
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
|
||||
// FileOutputFormat ignores the filesystem parameter
|
||||
val ignoredFs = FileSystem.get(conf)
|
||||
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
|
||||
val ignoredFs = FileSystem.get(hadoopConf)
|
||||
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
|
||||
}
|
||||
|
||||
val writer = new SparkHadoopWriter(conf)
|
||||
val writer = new SparkHadoopWriter(hadoopConf)
|
||||
writer.preSetup()
|
||||
|
||||
def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
|
||||
|
|
Loading…
Reference in a new issue