[SPARK-28203][CORE][PYTHON] PythonRDD should respect SparkContext's hadoop configuration

## What changes were proposed in this pull request?
1. PythonHadoopUtil.mapToConf generates a Configuration with loadDefaults disabled
2. merging hadoop conf in several places of PythonRDD is consistent.

## How was this patch tested?
Added a new test and existed tests

Closes #25002 from advancedxy/SPARK-28203.

Authored-by: Xianjin YE <advancedxy@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Xianjin YE 2019-08-15 10:39:33 +09:00 committed by HyukjinKwon
parent 3a4afce96c
commit 3ec24fd128
3 changed files with 89 additions and 8 deletions

View file

@ -156,7 +156,7 @@ private[python] object PythonHadoopUtil {
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
*/
def mapToConf(map: java.util.Map[String, String]): Configuration = {
val conf = new Configuration()
val conf = new Configuration(false)
map.asScala.foreach { case (k, v) => conf.set(k, v) }
conf
}

View file

@ -335,7 +335,7 @@ private[spark] object PythonRDD extends Logging {
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
batchSize: Int): JavaRDD[Array[Byte]] = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val conf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
@ -404,7 +404,7 @@ private[spark] object PythonRDD extends Logging {
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
batchSize: Int): JavaRDD[Array[Byte]] = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val conf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
@ -620,7 +620,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
useNewAPI: Boolean): Unit = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val conf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized),
keyConverterClass, valueConverterClass, new JavaToWritableConverter)
if (useNewAPI) {

View file

@ -17,16 +17,42 @@
package org.apache.spark.api.python
import java.io.{ByteArrayOutputStream, DataOutputStream}
import java.io.{ByteArrayOutputStream, DataOutputStream, File}
import java.net.{InetAddress, Socket}
import java.nio.charset.StandardCharsets
import java.util
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
class PythonRDDSuite extends SparkFunSuite {
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer}
import org.apache.spark.util.Utils
class PythonRDDSuite extends SparkFunSuite with LocalSparkContext {
var tempDir: File = _
override def beforeAll(): Unit = {
super.beforeAll()
tempDir = Utils.createTempDir()
}
override def afterAll(): Unit = {
try {
Utils.deleteRecursively(tempDir)
} finally {
super.afterAll()
}
}
test("Writing large strings to the worker") {
val input: List[String] = List("a"*100000)
@ -65,4 +91,59 @@ class PythonRDDSuite extends SparkFunSuite {
throw new Exception("exception within handleConnection")
}
}
test("mapToConf should not load defaults") {
val map = Map("key" -> "value")
val conf = PythonHadoopUtil.mapToConf(map.asJava)
assert(conf.size() === map.size)
assert(conf.get("key") === map("key"))
}
test("SparkContext's hadoop configuration should be respected in PythonRDD") {
// hadoop conf with default configurations
val hadoopConf = new Configuration()
assert(hadoopConf.size() > 0)
val headEntry = hadoopConf.asScala.head
val (firstKey, firstValue) = (headEntry.getKey, headEntry.getValue)
// passed to spark conf with a different value(prefixed by spark.)
val conf = new SparkConf().setAppName("test").setMaster("local")
conf.set("spark.hadoop." + firstKey, "spark." + firstValue)
sc = new SparkContext(conf)
val outDir = new File(tempDir, "output").getAbsolutePath
// write output as HadoopRDD's input
sc.makeRDD(1 to 1000, 10).saveAsTextFile(outDir)
val javaSparkContext = new JavaSparkContext(sc)
val confMap = new util.HashMap[String, String]()
// set input path in job conf
confMap.put(FileInputFormat.INPUT_DIR, outDir)
val pythonRDD = PythonRDD.hadoopRDD(
javaSparkContext,
classOf[TextInputFormat].getCanonicalName,
classOf[LongWritable].getCanonicalName,
classOf[Text].getCanonicalName,
null,
null,
confMap,
0
)
@tailrec
def getRootRDD(rdd: RDD[_]): RDD[_] = {
rdd.dependencies match {
case Nil => rdd
case dependency :: _ => getRootRDD(dependency.rdd)
}
}
// retrieve hadoopRDD as it's a root RDD
val hadoopRDD = getRootRDD(pythonRDD).asInstanceOf[HadoopRDD[_, _]]
val jobConf = hadoopRDD.getConf
// the jobConf passed to HadoopRDD should contain SparkContext's hadoop items rather the default
// configs in client's Configuration
assert(jobConf.get(firstKey) === "spark." + firstValue)
}
}