Move Configuration broadcasts to SparkContext.

This commit is contained in:
Harvey 2013-09-22 14:43:58 -07:00
parent a6eeb5ffd5
commit ef34cfb26c
2 changed files with 6 additions and 8 deletions

View file

@ -333,7 +333,11 @@ class SparkContext(
valueClass: Class[V], valueClass: Class[V],
minSplits: Int = defaultMinSplits minSplits: Int = defaultMinSplits
): RDD[(K, V)] = { ): RDD[(K, V)] = {
new HadoopDatasetRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) // Add necessary security credentials to the JobConf before broadcasting it.
SparkEnv.get.hadoop.addCredentials(conf)
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(conf))
new HadoopDatasetRDD(this, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
} }
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */ /** Get an RDD for a Hadoop file with an arbitrary InputFormat */

View file

@ -69,19 +69,13 @@ class HadoopFileRDD[K, V](
*/ */
class HadoopDatasetRDD[K, V]( class HadoopDatasetRDD[K, V](
sc: SparkContext, sc: SparkContext,
@transient conf: JobConf, confBroadcast: Broadcast[SerializableWritable[JobConf]],
inputFormatClass: Class[_ <: InputFormat[K, V]], inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
minSplits: Int) minSplits: Int)
extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) { extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkEnv.get.hadoop.addCredentials(conf)
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
override def getJobConf(): JobConf = confBroadcast.value.value override def getJobConf(): JobConf = confBroadcast.value.value
} }