we clone hadoop key and values by default and reuse if specified.
This commit is contained in:
parent
c0f0155eca
commit
277b4a36c5
|
@ -341,25 +341,27 @@ class SparkContext(
|
|||
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
|
||||
* etc).
|
||||
*/
|
||||
def hadoopRDD[K, V](
|
||||
def hadoopRDD[K: ClassTag, V: ClassTag](
|
||||
conf: JobConf,
|
||||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int = defaultMinSplits
|
||||
minSplits: Int = defaultMinSplits,
|
||||
cloneKeyValues: Boolean = true
|
||||
): RDD[(K, V)] = {
|
||||
// Add necessary security credentials to the JobConf before broadcasting it.
|
||||
SparkHadoopUtil.get.addCredentials(conf)
|
||||
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
|
||||
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
|
||||
}
|
||||
|
||||
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
|
||||
def hadoopFile[K, V](
|
||||
def hadoopFile[K: ClassTag, V: ClassTag](
|
||||
path: String,
|
||||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int = defaultMinSplits
|
||||
minSplits: Int = defaultMinSplits,
|
||||
cloneKeyValues: Boolean = true
|
||||
): RDD[(K, V)] = {
|
||||
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
|
||||
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
|
||||
|
@ -371,7 +373,8 @@ class SparkContext(
|
|||
inputFormatClass,
|
||||
keyClass,
|
||||
valueClass,
|
||||
minSplits)
|
||||
minSplits,
|
||||
cloneKeyValues)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -382,14 +385,15 @@ class SparkContext(
|
|||
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
|
||||
* }}}
|
||||
*/
|
||||
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
|
||||
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
|
||||
: RDD[(K, V)] = {
|
||||
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int,
|
||||
cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
|
||||
): RDD[(K, V)] = {
|
||||
hadoopFile(path,
|
||||
fm.runtimeClass.asInstanceOf[Class[F]],
|
||||
km.runtimeClass.asInstanceOf[Class[K]],
|
||||
vm.runtimeClass.asInstanceOf[Class[V]],
|
||||
minSplits)
|
||||
minSplits,
|
||||
cloneKeyValues = cloneKeyValues)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -400,61 +404,67 @@ class SparkContext(
|
|||
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
|
||||
* }}}
|
||||
*/
|
||||
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
|
||||
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean = true)
|
||||
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
|
||||
hadoopFile[K, V, F](path, defaultMinSplits)
|
||||
hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
|
||||
|
||||
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
|
||||
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
|
||||
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
|
||||
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String,
|
||||
cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
|
||||
): RDD[(K, V)] = {
|
||||
newAPIHadoopFile(
|
||||
path,
|
||||
fm.runtimeClass.asInstanceOf[Class[F]],
|
||||
km.runtimeClass.asInstanceOf[Class[K]],
|
||||
vm.runtimeClass.asInstanceOf[Class[V]])
|
||||
vm.runtimeClass.asInstanceOf[Class[V]],
|
||||
cloneKeyValues = cloneKeyValues)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
|
||||
* and extra configuration options to pass to the input format.
|
||||
*/
|
||||
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
|
||||
def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
|
||||
path: String,
|
||||
fClass: Class[F],
|
||||
kClass: Class[K],
|
||||
vClass: Class[V],
|
||||
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
|
||||
conf: Configuration = hadoopConfiguration,
|
||||
cloneKeyValues: Boolean = true): RDD[(K, V)] = {
|
||||
val job = new NewHadoopJob(conf)
|
||||
NewFileInputFormat.addInputPath(job, new Path(path))
|
||||
val updatedConf = job.getConfiguration
|
||||
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
|
||||
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
|
||||
* and extra configuration options to pass to the input format.
|
||||
*/
|
||||
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
|
||||
def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
|
||||
conf: Configuration = hadoopConfiguration,
|
||||
fClass: Class[F],
|
||||
kClass: Class[K],
|
||||
vClass: Class[V]): RDD[(K, V)] = {
|
||||
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
|
||||
vClass: Class[V],
|
||||
cloneKeyValues: Boolean = true): RDD[(K, V)] = {
|
||||
new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues)
|
||||
}
|
||||
|
||||
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
|
||||
def sequenceFile[K, V](path: String,
|
||||
def sequenceFile[K: ClassTag, V: ClassTag](path: String,
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int
|
||||
minSplits: Int,
|
||||
cloneKeyValues: Boolean = true
|
||||
): RDD[(K, V)] = {
|
||||
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
|
||||
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
|
||||
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
|
||||
}
|
||||
|
||||
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
|
||||
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
|
||||
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
|
||||
def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
|
||||
cloneKeyValues: Boolean = true): RDD[(K, V)] =
|
||||
sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues)
|
||||
|
||||
/**
|
||||
* Version of sequenceFile() for types implicitly convertible to Writables through a
|
||||
|
@ -472,8 +482,8 @@ class SparkContext(
|
|||
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
|
||||
* allow it to figure out the Writable class to use in the subclass case.
|
||||
*/
|
||||
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
|
||||
(implicit km: ClassTag[K], vm: ClassTag[V],
|
||||
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits,
|
||||
cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V],
|
||||
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
|
||||
: RDD[(K, V)] = {
|
||||
val kc = kcf()
|
||||
|
@ -481,7 +491,7 @@ class SparkContext(
|
|||
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
|
||||
val writables = hadoopFile(path, format,
|
||||
kc.writableClass(km).asInstanceOf[Class[Writable]],
|
||||
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
|
||||
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues)
|
||||
writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.spark.rdd
|
|||
|
||||
import java.io.EOFException
|
||||
|
||||
import org.apache.hadoop.mapred.FileInputFormat
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import org.apache.hadoop.conf.{Configuration, Configurable}
|
||||
import org.apache.hadoop.mapred.InputFormat
|
||||
import org.apache.hadoop.mapred.InputSplit
|
||||
import org.apache.hadoop.mapred.JobConf
|
||||
|
@ -31,7 +33,7 @@ import org.apache.spark._
|
|||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.util.NextIterator
|
||||
import org.apache.hadoop.conf.{Configuration, Configurable}
|
||||
import org.apache.spark.util.Utils.cloneWritables
|
||||
|
||||
|
||||
/**
|
||||
|
@ -62,14 +64,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
|
|||
* @param valueClass Class of the value associated with the inputFormatClass.
|
||||
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
|
||||
*/
|
||||
class HadoopRDD[K, V](
|
||||
class HadoopRDD[K: ClassTag, V: ClassTag](
|
||||
sc: SparkContext,
|
||||
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
|
||||
initLocalJobConfFuncOpt: Option[JobConf => Unit],
|
||||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int)
|
||||
minSplits: Int,
|
||||
cloneKeyValues: Boolean)
|
||||
extends RDD[(K, V)](sc, Nil) with Logging {
|
||||
|
||||
def this(
|
||||
|
@ -78,7 +81,8 @@ class HadoopRDD[K, V](
|
|||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int) = {
|
||||
minSplits: Int,
|
||||
cloneKeyValues: Boolean) = {
|
||||
this(
|
||||
sc,
|
||||
sc.broadcast(new SerializableWritable(conf))
|
||||
|
@ -87,7 +91,7 @@ class HadoopRDD[K, V](
|
|||
inputFormatClass,
|
||||
keyClass,
|
||||
valueClass,
|
||||
minSplits)
|
||||
minSplits, cloneKeyValues)
|
||||
}
|
||||
|
||||
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
|
||||
|
@ -169,7 +173,11 @@ class HadoopRDD[K, V](
|
|||
case eof: EOFException =>
|
||||
finished = true
|
||||
}
|
||||
(key, value)
|
||||
if (cloneKeyValues) {
|
||||
(cloneWritables(key, getConf), cloneWritables(value, getConf))
|
||||
} else {
|
||||
(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
override def close() {
|
||||
|
|
|
@ -20,11 +20,14 @@ package org.apache.spark.rdd
|
|||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import org.apache.hadoop.conf.{Configurable, Configuration}
|
||||
import org.apache.hadoop.io.Writable
|
||||
import org.apache.hadoop.mapreduce._
|
||||
|
||||
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
|
||||
import org.apache.spark.util.Utils.cloneWritables
|
||||
|
||||
|
||||
private[spark]
|
||||
|
@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
|
|||
override def hashCode(): Int = (41 * (41 + rddId) + index)
|
||||
}
|
||||
|
||||
class NewHadoopRDD[K, V](
|
||||
class NewHadoopRDD[K: ClassTag, V: ClassTag](
|
||||
sc : SparkContext,
|
||||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
@transient conf: Configuration)
|
||||
@transient conf: Configuration,
|
||||
cloneKeyValues: Boolean)
|
||||
extends RDD[(K, V)](sc, Nil)
|
||||
with SparkHadoopMapReduceUtil
|
||||
with Logging {
|
||||
|
@ -105,7 +109,12 @@ class NewHadoopRDD[K, V](
|
|||
throw new java.util.NoSuchElementException("End of stream")
|
||||
}
|
||||
havePair = false
|
||||
(reader.getCurrentKey, reader.getCurrentValue)
|
||||
val key = reader.getCurrentKey
|
||||
val value = reader.getCurrentValue
|
||||
if (cloneKeyValues) {
|
||||
(cloneWritables(key, conf), cloneWritables(value, conf))
|
||||
} else
|
||||
(key, value)
|
||||
}
|
||||
|
||||
private def close() {
|
||||
|
|
|
@ -26,23 +26,42 @@ import scala.collection.JavaConversions._
|
|||
import scala.collection.Map
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.io.Source
|
||||
import scala.reflect.ClassTag
|
||||
import scala.reflect.{classTag, ClassTag}
|
||||
|
||||
import com.google.common.io.Files
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
|
||||
import org.apache.hadoop.io._
|
||||
|
||||
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import java.nio.ByteBuffer
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
|
||||
import org.apache.spark.{SparkConf, SparkException, Logging}
|
||||
|
||||
|
||||
/**
|
||||
* Various utility methods used by Spark.
|
||||
*/
|
||||
private[spark] object Utils extends Logging {
|
||||
|
||||
/**
|
||||
* We try to clone for most common types of writables and we call WritableUtils.clone otherwise
|
||||
* intention is to optimize, for example for NullWritable there is no need and for Long, int and
|
||||
* String creating a new object with value set would be faster.
|
||||
*/
|
||||
def cloneWritables[T: ClassTag](obj: T, conf: Configuration): T = {
|
||||
val cloned = classTag[T] match {
|
||||
case ClassTag(_: Text) => new Text(obj.asInstanceOf[Text].getBytes)
|
||||
case ClassTag(_: LongWritable) => new LongWritable(obj.asInstanceOf[LongWritable].get)
|
||||
case ClassTag(_: IntWritable) => new IntWritable(obj.asInstanceOf[IntWritable].get)
|
||||
case ClassTag(_: NullWritable) => obj // TODO: should we clone this ?
|
||||
case _ => WritableUtils.clone(obj.asInstanceOf[Writable], conf) // slower way of cloning.
|
||||
}
|
||||
cloned.asInstanceOf[T]
|
||||
}
|
||||
|
||||
/** Serialize an object using Java serialization */
|
||||
def serialize[T](o: T): Array[Byte] = {
|
||||
val bos = new ByteArrayOutputStream()
|
||||
|
|
Loading…
Reference in a new issue