From e3861ae3953d7cab66160833688c8baf84e835ad Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Wed, 9 Jan 2013 17:03:25 -0600 Subject: [PATCH] Provide and expose a default Hadoop Configuration. Any "hadoop.*" system properties will be passed along into configuration. --- core/src/main/scala/spark/SparkContext.scala | 18 ++++++++++++++---- .../spark/api/java/JavaSparkContext.scala | 7 +++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bbf8272eb3..36e0938854 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -187,6 +187,18 @@ class SparkContext( private var dagScheduler = new DAGScheduler(taskScheduler) + /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ + val hadoopConfiguration = { + val conf = new Configuration() + // Copy any "hadoop.foo=bar" system properties into conf as "foo=bar" + for (key <- System.getProperties.keys.asInstanceOf[Set[String]] if key.startsWith("hadoop.")) { + conf.set(key.substring("hadoop.".length), System.getProperty(key)) + } + val bufferSize = System.getProperty("spark.buffer.size", "65536") + conf.set("io.file.buffer.size", bufferSize) + conf + } + // Methods for creating RDDs /** Distribute a local Scala collection to form an RDD. */ @@ -231,10 +243,8 @@ class SparkContext( valueClass: Class[V], minSplits: Int = defaultMinSplits ) : RDD[(K, V)] = { - val conf = new JobConf() + val conf = new JobConf(hadoopConfiguration) FileInputFormat.setInputPaths(conf, path) - val bufferSize = System.getProperty("spark.buffer.size", "65536") - conf.set("io.file.buffer.size", bufferSize) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } @@ -276,7 +286,7 @@ class SparkContext( fm.erasure.asInstanceOf[Class[F]], km.erasure.asInstanceOf[Class[K]], vm.erasure.asInstanceOf[Class[V]], - new Configuration) + hadoopConfiguration) } /** diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index 88ab2846be..12e2a0bdac 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -355,6 +355,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def clearFiles() { sc.clearFiles() } + + /** + * Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse. + */ + def hadoopConfiguration() { + sc.hadoopConfiguration + } } object JavaSparkContext {