Added save operations to DStreams.

This commit is contained in:
Tathagata Das 2012-10-27 18:55:50 -07:00
parent 650d717544
commit 1b900183c8
3 changed files with 84 additions and 3 deletions

View file

@ -363,6 +363,22 @@ extends Serializable with Logging {
rdds.toSeq
}
def saveAsObjectFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
this.foreachRDD(saveFunc)
}
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
this.foreachRDD(saveFunc)
}
def register() {
ssc.registerOutputStream(this)
}

View file

@ -1,9 +1,16 @@
package spark.streaming
import scala.collection.mutable.ArrayBuffer
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.streaming.StreamingContext._
import javax.annotation.Nullable
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
extends Serializable {
@ -231,6 +238,54 @@ extends Serializable {
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
}
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String
)(implicit fm: ClassManifest[F]) {
saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
def saveAsHadoopFiles(
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf
) {
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
self.foreachRDD(saveFunc)
}
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
suffix: String
)(implicit fm: ClassManifest[F]) {
saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = new Configuration
) {
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
self.foreachRDD(saveFunc)
}
private def getKeyClass() = implicitly[ClassManifest[K]].erasure
private def getValueClass() = implicitly[ClassManifest[V]].erasure
}

View file

@ -225,5 +225,15 @@ object StreamingContext {
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
if (prefix == null) {
time.millis.toString
} else if (suffix == null || suffix.length ==0) {
prefix + "-" + time.milliseconds
} else {
prefix + "-" + time.milliseconds + "." + suffix
}
}
}