Provide and expose a default Hadoop Configuration.

Any "hadoop.*" system properties will be passed along into configuration.
This commit is contained in:
Stephen Haberman 2013-01-09 17:03:25 -06:00
parent 14972141f9
commit e3861ae395
2 changed files with 21 additions and 4 deletions

View file

@ -187,6 +187,18 @@ class SparkContext(
private var dagScheduler = new DAGScheduler(taskScheduler) private var dagScheduler = new DAGScheduler(taskScheduler)
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val conf = new Configuration()
// Copy any "hadoop.foo=bar" system properties into conf as "foo=bar"
for (key <- System.getProperties.keys.asInstanceOf[Set[String]] if key.startsWith("hadoop.")) {
conf.set(key.substring("hadoop.".length), System.getProperty(key))
}
val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize)
conf
}
// Methods for creating RDDs // Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD. */ /** Distribute a local Scala collection to form an RDD. */
@ -231,10 +243,8 @@ class SparkContext(
valueClass: Class[V], valueClass: Class[V],
minSplits: Int = defaultMinSplits minSplits: Int = defaultMinSplits
) : RDD[(K, V)] = { ) : RDD[(K, V)] = {
val conf = new JobConf() val conf = new JobConf(hadoopConfiguration)
FileInputFormat.setInputPaths(conf, path) FileInputFormat.setInputPaths(conf, path)
val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
} }
@ -276,7 +286,7 @@ class SparkContext(
fm.erasure.asInstanceOf[Class[F]], fm.erasure.asInstanceOf[Class[F]],
km.erasure.asInstanceOf[Class[K]], km.erasure.asInstanceOf[Class[K]],
vm.erasure.asInstanceOf[Class[V]], vm.erasure.asInstanceOf[Class[V]],
new Configuration) hadoopConfiguration)
} }
/** /**

View file

@ -355,6 +355,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def clearFiles() { def clearFiles() {
sc.clearFiles() sc.clearFiles()
} }
/**
* Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
*/
def hadoopConfiguration() {
sc.hadoopConfiguration
}
} }
object JavaSparkContext { object JavaSparkContext {