diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5a36e6f5c1..456070fa7c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -454,14 +454,21 @@ class SparkContext(config: SparkConf) extends Logging { * (a-hdfs-path/part-nnnnn, its content) * }}} * - * @note Small files are preferred, as each file will be loaded fully in memory. + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minSplits A suggestion value of the minimal splitting number for input data. */ - def wholeTextFiles(path: String): RDD[(String, String)] = { - newAPIHadoopFile( - path, + def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = { + val job = new NewHadoopJob(hadoopConfiguration) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updateConf = job.getConfiguration + new WholeTextFileRDD( + this, classOf[WholeTextFileInputFormat], classOf[String], - classOf[String]) + classOf[String], + updateConf, + minSplits) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 1e8242a2cb..7fbefe1cb0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -177,7 +177,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * (a-hdfs-path/part-nnnnn, its content) * }}} * - * @note Small files are preferred, as each file will be loaded fully in memory. + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minSplits A suggestion value of the minimal splitting number for input data. + */ + def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] = + new JavaPairRDD(sc.wholeTextFiles(path, minSplits)) + + /** + * Read a directory of text files from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + * @see `wholeTextFiles(path: String, minSplits: Int)`. */ def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 4887fb6b84..80d055a895 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -17,6 +17,8 @@ package org.apache.spark.input +import scala.collection.JavaConversions._ + import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.JobContext @@ -44,4 +46,16 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str context, classOf[WholeTextFileRecordReader]) } + + /** + * Allow minSplits set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minSplits: Int) { + val files = listStatus(context) + val totalLen = files.map { file => + if (file.isDir) 0L else file.getLen + }.sum + val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong + super.setMaxSplitSize(maxSplitSize) + } } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 2d8dfa5a16..8684b645bc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -24,11 +24,18 @@ import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ -import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.input.WholeTextFileInputFormat +import org.apache.spark.InterruptibleIterator +import org.apache.spark.Logging +import org.apache.spark.Partition +import org.apache.spark.SerializableWritable +import org.apache.spark.{SparkContext, TaskContext} -private[spark] -class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) +private[spark] class NewHadoopPartition( + rddId: Int, + val index: Int, + @transient rawSplit: InputSplit with Writable) extends Partition { val serializableHadoopSplit = new SerializableWritable(rawSplit) @@ -65,17 +72,19 @@ class NewHadoopRDD[K, V]( private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) // private val serializableConf = new SerializableWritable(conf) - private val jobtrackerId: String = { + private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") formatter.format(new Date()) } - @transient private val jobId = new JobID(jobtrackerId, id) + @transient protected val jobId = new JobID(jobTrackerId, id) override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance - if (inputFormat.isInstanceOf[Configurable]) { - inputFormat.asInstanceOf[Configurable].setConf(conf) + inputFormat match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => } val jobContext = newJobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray @@ -91,11 +100,13 @@ class NewHadoopRDD[K, V]( val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) val conf = confBroadcast.value.value - val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0) + val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance - if (format.isInstanceOf[Configurable]) { - format.asInstanceOf[Configurable].setConf(conf) + format match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => } val reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) @@ -141,3 +152,30 @@ class NewHadoopRDD[K, V]( def getConf: Configuration = confBroadcast.value.value } +private[spark] class WholeTextFileRDD( + sc : SparkContext, + inputFormatClass: Class[_ <: WholeTextFileInputFormat], + keyClass: Class[String], + valueClass: Class[String], + @transient conf: Configuration, + minSplits: Int) + extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) { + + override def getPartitions: Array[Partition] = { + val inputFormat = inputFormatClass.newInstance + inputFormat match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => + } + val jobContext = newJobContext(conf, jobId) + inputFormat.setMaxSplitSize(jobContext, minSplits) + val rawSplits = inputFormat.getSplits(jobContext).toArray + val result = new Array[Partition](rawSplits.size) + for (i <- 0 until rawSplits.size) { + result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } + result + } +} + diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index ab2fdac553..8d2e9f1846 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -626,7 +626,7 @@ public class JavaAPISuite implements Serializable { container.put(tempDirName+"/part-00000", new Text(content1).toString()); container.put(tempDirName+"/part-00001", new Text(content2).toString()); - JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName); + JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName, 3); List> result = readRDD.collect(); for (Tuple2 res : result) { diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index e89b296d41..33d6de9a76 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -73,7 +73,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { createNativeFile(dir, filename, contents) } - val res = sc.wholeTextFiles(dir.toString).collect() + val res = sc.wholeTextFiles(dir.toString, 3).collect() assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size, "Number of files read out does not fit with the actual value.")