[SPARK-1415] Hadoop min split for wholeTextFiles()
JIRA issue [here](https://issues.apache.org/jira/browse/SPARK-1415). New Hadoop API of `InputFormat` does not provide the `minSplits` parameter, which makes the API incompatible between `HadoopRDD` and `NewHadoopRDD`. The PR is for constructing compatible APIs. Though `minSplits` is deprecated by New Hadoop API, we think it is better to make APIs compatible here. **Note** that `minSplits` in `wholeTextFiles` could only be treated as a *suggestion*, the real number of splits may not be greater than `minSplits` due to `isSplitable()=false`. Author: Xusen Yin <yinxusen@gmail.com> Closes #376 from yinxusen/hadoop-min-split and squashes the following commits: 76417f6 [Xusen Yin] refine comments c10af60 [Xusen Yin] refine comments and rewrite new class for wholeTextFile 766d05b [Xusen Yin] refine Java API and comments 4875755 [Xusen Yin] add minSplits for WholeTextFiles
This commit is contained in:
parent
4bc07eebbf
commit
037fe4d2ba
|
@ -454,14 +454,21 @@ class SparkContext(config: SparkConf) extends Logging {
|
|||
* (a-hdfs-path/part-nnnnn, its content)
|
||||
* }}}
|
||||
*
|
||||
* @note Small files are preferred, as each file will be loaded fully in memory.
|
||||
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
|
||||
*
|
||||
* @param minSplits A suggestion value of the minimal splitting number for input data.
|
||||
*/
|
||||
def wholeTextFiles(path: String): RDD[(String, String)] = {
|
||||
newAPIHadoopFile(
|
||||
path,
|
||||
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
|
||||
val job = new NewHadoopJob(hadoopConfiguration)
|
||||
NewFileInputFormat.addInputPath(job, new Path(path))
|
||||
val updateConf = job.getConfiguration
|
||||
new WholeTextFileRDD(
|
||||
this,
|
||||
classOf[WholeTextFileInputFormat],
|
||||
classOf[String],
|
||||
classOf[String])
|
||||
classOf[String],
|
||||
updateConf,
|
||||
minSplits)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -177,7 +177,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
|||
* (a-hdfs-path/part-nnnnn, its content)
|
||||
* }}}
|
||||
*
|
||||
* @note Small files are preferred, as each file will be loaded fully in memory.
|
||||
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
|
||||
*
|
||||
* @param minSplits A suggestion value of the minimal splitting number for input data.
|
||||
*/
|
||||
def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
|
||||
new JavaPairRDD(sc.wholeTextFiles(path, minSplits))
|
||||
|
||||
/**
|
||||
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
|
||||
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
|
||||
* key-value pair, where the key is the path of each file, the value is the content of each file.
|
||||
*
|
||||
* @see `wholeTextFiles(path: String, minSplits: Int)`.
|
||||
*/
|
||||
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
|
||||
new JavaPairRDD(sc.wholeTextFiles(path))
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.input
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.mapreduce.InputSplit
|
||||
import org.apache.hadoop.mapreduce.JobContext
|
||||
|
@ -44,4 +46,16 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
|
|||
context,
|
||||
classOf[WholeTextFileRecordReader])
|
||||
}
|
||||
|
||||
/**
|
||||
* Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
|
||||
*/
|
||||
def setMaxSplitSize(context: JobContext, minSplits: Int) {
|
||||
val files = listStatus(context)
|
||||
val totalLen = files.map { file =>
|
||||
if (file.isDir) 0L else file.getLen
|
||||
}.sum
|
||||
val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
|
||||
super.setMaxSplitSize(maxSplitSize)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,11 +24,18 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
|
|||
import org.apache.hadoop.io.Writable
|
||||
import org.apache.hadoop.mapreduce._
|
||||
|
||||
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.input.WholeTextFileInputFormat
|
||||
import org.apache.spark.InterruptibleIterator
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.Partition
|
||||
import org.apache.spark.SerializableWritable
|
||||
import org.apache.spark.{SparkContext, TaskContext}
|
||||
|
||||
private[spark]
|
||||
class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
|
||||
private[spark] class NewHadoopPartition(
|
||||
rddId: Int,
|
||||
val index: Int,
|
||||
@transient rawSplit: InputSplit with Writable)
|
||||
extends Partition {
|
||||
|
||||
val serializableHadoopSplit = new SerializableWritable(rawSplit)
|
||||
|
@ -65,17 +72,19 @@ class NewHadoopRDD[K, V](
|
|||
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
|
||||
// private val serializableConf = new SerializableWritable(conf)
|
||||
|
||||
private val jobtrackerId: String = {
|
||||
private val jobTrackerId: String = {
|
||||
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
|
||||
formatter.format(new Date())
|
||||
}
|
||||
|
||||
@transient private val jobId = new JobID(jobtrackerId, id)
|
||||
@transient protected val jobId = new JobID(jobTrackerId, id)
|
||||
|
||||
override def getPartitions: Array[Partition] = {
|
||||
val inputFormat = inputFormatClass.newInstance
|
||||
if (inputFormat.isInstanceOf[Configurable]) {
|
||||
inputFormat.asInstanceOf[Configurable].setConf(conf)
|
||||
inputFormat match {
|
||||
case configurable: Configurable =>
|
||||
configurable.setConf(conf)
|
||||
case _ =>
|
||||
}
|
||||
val jobContext = newJobContext(conf, jobId)
|
||||
val rawSplits = inputFormat.getSplits(jobContext).toArray
|
||||
|
@ -91,11 +100,13 @@ class NewHadoopRDD[K, V](
|
|||
val split = theSplit.asInstanceOf[NewHadoopPartition]
|
||||
logInfo("Input split: " + split.serializableHadoopSplit)
|
||||
val conf = confBroadcast.value.value
|
||||
val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0)
|
||||
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
|
||||
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
|
||||
val format = inputFormatClass.newInstance
|
||||
if (format.isInstanceOf[Configurable]) {
|
||||
format.asInstanceOf[Configurable].setConf(conf)
|
||||
format match {
|
||||
case configurable: Configurable =>
|
||||
configurable.setConf(conf)
|
||||
case _ =>
|
||||
}
|
||||
val reader = format.createRecordReader(
|
||||
split.serializableHadoopSplit.value, hadoopAttemptContext)
|
||||
|
@ -141,3 +152,30 @@ class NewHadoopRDD[K, V](
|
|||
def getConf: Configuration = confBroadcast.value.value
|
||||
}
|
||||
|
||||
private[spark] class WholeTextFileRDD(
|
||||
sc : SparkContext,
|
||||
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
|
||||
keyClass: Class[String],
|
||||
valueClass: Class[String],
|
||||
@transient conf: Configuration,
|
||||
minSplits: Int)
|
||||
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
|
||||
|
||||
override def getPartitions: Array[Partition] = {
|
||||
val inputFormat = inputFormatClass.newInstance
|
||||
inputFormat match {
|
||||
case configurable: Configurable =>
|
||||
configurable.setConf(conf)
|
||||
case _ =>
|
||||
}
|
||||
val jobContext = newJobContext(conf, jobId)
|
||||
inputFormat.setMaxSplitSize(jobContext, minSplits)
|
||||
val rawSplits = inputFormat.getSplits(jobContext).toArray
|
||||
val result = new Array[Partition](rawSplits.size)
|
||||
for (i <- 0 until rawSplits.size) {
|
||||
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -626,7 +626,7 @@ public class JavaAPISuite implements Serializable {
|
|||
container.put(tempDirName+"/part-00000", new Text(content1).toString());
|
||||
container.put(tempDirName+"/part-00001", new Text(content2).toString());
|
||||
|
||||
JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName);
|
||||
JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3);
|
||||
List<Tuple2<String, String>> result = readRDD.collect();
|
||||
|
||||
for (Tuple2<String, String> res : result) {
|
||||
|
|
|
@ -73,7 +73,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
|
|||
createNativeFile(dir, filename, contents)
|
||||
}
|
||||
|
||||
val res = sc.wholeTextFiles(dir.toString).collect()
|
||||
val res = sc.wholeTextFiles(dir.toString, 3).collect()
|
||||
|
||||
assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
|
||||
"Number of files read out does not fit with the actual value.")
|
||||
|
|
Loading…
Reference in a new issue