2010-03-29 19:17:55 -04:00
|
|
|
package spark
|
|
|
|
|
2010-06-17 15:49:42 -04:00
|
|
|
import nexus.SlaveOffer
|
2010-03-29 19:17:55 -04:00
|
|
|
|
|
|
|
import org.apache.hadoop.io.LongWritable
|
|
|
|
import org.apache.hadoop.io.Text
|
|
|
|
import org.apache.hadoop.mapred.FileInputFormat
|
|
|
|
import org.apache.hadoop.mapred.InputSplit
|
|
|
|
import org.apache.hadoop.mapred.JobConf
|
|
|
|
import org.apache.hadoop.mapred.TextInputFormat
|
|
|
|
import org.apache.hadoop.mapred.RecordReader
|
|
|
|
import org.apache.hadoop.mapred.Reporter
|
|
|
|
|
|
|
|
class HdfsSplit(@transient s: InputSplit)
|
2010-06-17 15:49:42 -04:00
|
|
|
extends SerializableWritable[InputSplit](s) {}
|
2010-03-29 19:17:55 -04:00
|
|
|
|
|
|
|
class HdfsTextFile(sc: SparkContext, path: String)
|
2010-06-17 15:49:42 -04:00
|
|
|
extends RDD[String, HdfsSplit](sc) {
|
2010-03-29 19:17:55 -04:00
|
|
|
@transient val conf = new JobConf()
|
|
|
|
@transient val inputFormat = new TextInputFormat()
|
|
|
|
|
|
|
|
FileInputFormat.setInputPaths(conf, path)
|
|
|
|
ConfigureLock.synchronized { inputFormat.configure(conf) }
|
|
|
|
|
|
|
|
@transient val splits_ =
|
|
|
|
inputFormat.getSplits(conf, 2).map(new HdfsSplit(_)).toArray
|
|
|
|
|
|
|
|
override def splits = splits_
|
|
|
|
|
|
|
|
override def iterator(split: HdfsSplit) = new Iterator[String] {
|
|
|
|
var reader: RecordReader[LongWritable, Text] = null
|
|
|
|
ConfigureLock.synchronized {
|
|
|
|
val conf = new JobConf()
|
|
|
|
conf.set("io.file.buffer.size",
|
|
|
|
System.getProperty("spark.buffer.size", "65536"))
|
|
|
|
val tif = new TextInputFormat()
|
|
|
|
tif.configure(conf)
|
|
|
|
reader = tif.getRecordReader(split.value, conf, Reporter.NULL)
|
|
|
|
}
|
|
|
|
val lineNum = new LongWritable()
|
|
|
|
val text = new Text()
|
|
|
|
var gotNext = false
|
|
|
|
var finished = false
|
|
|
|
|
|
|
|
override def hasNext: Boolean = {
|
|
|
|
if (!gotNext) {
|
|
|
|
finished = !reader.next(lineNum, text)
|
|
|
|
gotNext = true
|
|
|
|
}
|
|
|
|
!finished
|
|
|
|
}
|
|
|
|
|
|
|
|
override def next: String = {
|
|
|
|
if (!gotNext)
|
|
|
|
finished = !reader.next(lineNum, text)
|
|
|
|
if (finished)
|
|
|
|
throw new java.util.NoSuchElementException("end of stream")
|
|
|
|
gotNext = false
|
|
|
|
text.toString
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2010-04-04 02:44:55 -04:00
|
|
|
override def preferredLocations(split: HdfsSplit) = {
|
|
|
|
// TODO: Filtering out "localhost" in case of file:// URLs
|
2010-06-27 18:21:54 -04:00
|
|
|
split.value.getLocations().filter(_ != "localhost")
|
2010-04-04 02:44:55 -04:00
|
|
|
}
|
2010-03-29 19:17:55 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
object ConfigureLock {}
|