[SPARK-4437] update doc for WholeCombineFileRecordReader

update doc for WholeCombineFileRecordReader

Author: Davies Liu <davies@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>

Closes #3301 from davies/fix_doc and squashes the following commits:

1d7422f [Davies Liu] Merge pull request #2 from JoshRosen/whole-text-file-cleanup
dc3d21a [Josh Rosen] More genericization in ConfigurableCombineFileRecordReader.
95d13eb [Davies Liu] address comment
bf800b9 [Davies Liu] update doc for WholeCombineFileRecordReader
This commit is contained in:
Davies Liu 2014-12-16 11:19:36 -08:00 committed by Josh Rosen
parent c246b95dd2
commit ed362008f0
2 changed files with 25 additions and 30 deletions

View file

@ -19,7 +19,6 @@ package org.apache.spark.input
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
@ -38,18 +37,13 @@ private[spark] class WholeTextFileInputFormat
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {
val reader = new WholeCombineFileRecordReader(split, context)
reader.setConf(conf)
val reader =
new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
reader.setConf(getConf)
reader
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.input
import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable}
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.io.Text
@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
/**
* A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
*/
private[spark] trait Configurable extends HConfigurable {
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
}
/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader(
index: Integer)
extends RecordReader[String, String] with Configurable {
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
@ -87,29 +93,24 @@ private[spark] class WholeTextFileRecordReader(
/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]]
* that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]]
* RecordReaders.
*/
private[spark] class WholeCombineFileRecordReader(
private[spark] class ConfigurableCombineFileRecordReader[K, V](
split: InputSplit,
context: TaskAttemptContext)
extends CombineFileRecordReader[String, String](
context: TaskAttemptContext,
recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable])
extends CombineFileRecordReader[K, V](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader]
recordReaderClass
) with Configurable {
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
}
r
}