update according to comments
This commit is contained in:
parent
4a9913d66a
commit
e179ff8a32
|
@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
|
|||
|
||||
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
|
||||
|
||||
import spark.broadcast.Broadcast
|
||||
import spark.Partitioner._
|
||||
import spark.partial.BoundedDouble
|
||||
import spark.partial.CountEvaluator
|
||||
|
@ -351,31 +352,93 @@ abstract class RDD[T: ClassManifest](
|
|||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
*/
|
||||
def pipe(command: String, transform: (T,String => Unit) => Any, arguments: Seq[String]): RDD[String] =
|
||||
new PipedRDD(this, command, transform, arguments)
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
*/
|
||||
def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command)
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
*/
|
||||
def pipe(command: Seq[String], transform: (T,String => Unit) => Any, arguments: Seq[String]): RDD[String] =
|
||||
new PipedRDD(this, command, transform, arguments)
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
*/
|
||||
def pipe(command: Seq[String], env: Map[String, String]): RDD[String] =
|
||||
def pipe(command: String, env: Map[String, String]): RDD[String] =
|
||||
new PipedRDD(this, command, env)
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
* How each record in RDD is outputed to the process can be controled by providing a
|
||||
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
|
||||
* the currnet record in RDD as the 1st parameter, and the function to output the record to
|
||||
* the external process (like out.println()) as the 2nd parameter.
|
||||
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
|
||||
* instead of constructing a huge String to concat all the records:
|
||||
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
|
||||
* pipeContext can be used to transfer additional context data to the external process
|
||||
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
|
||||
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
|
||||
* be customized by the last parameter delimiter.
|
||||
*/
|
||||
def pipe(command: Seq[String], env: Map[String, String], transform: (T,String => Unit) => Any, arguments: Seq[String]): RDD[String] =
|
||||
new PipedRDD(this, command, env, transform, arguments)
|
||||
def pipe[U<: Seq[String]](
|
||||
command: String,
|
||||
env: Map[String, String],
|
||||
transform: (T,String => Unit) => Any,
|
||||
pipeContext: Broadcast[U],
|
||||
delimiter: String): RDD[String] =
|
||||
new PipedRDD(this, command, env, transform, pipeContext, delimiter)
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
* How each record in RDD is outputed to the process can be controled by providing a
|
||||
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
|
||||
* the currnet record in RDD as the 1st parameter, and the function to output the record to
|
||||
* the external process (like out.println()) as the 2nd parameter.
|
||||
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
|
||||
* instead of constructing a huge String to concat all the records:
|
||||
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
|
||||
* pipeContext can be used to transfer additional context data to the external process
|
||||
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
|
||||
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
|
||||
* be customized by the last parameter delimiter.
|
||||
*/
|
||||
def pipe[U<: Seq[String]](
|
||||
command: String,
|
||||
transform: (T,String => Unit) => Any,
|
||||
pipeContext: Broadcast[U]): RDD[String] =
|
||||
new PipedRDD(this, command, Map[String, String](), transform, pipeContext, "\u0001")
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
* How each record in RDD is outputed to the process can be controled by providing a
|
||||
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
|
||||
* the currnet record in RDD as the 1st parameter, and the function to output the record to
|
||||
* the external process (like out.println()) as the 2nd parameter.
|
||||
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
|
||||
* instead of constructing a huge String to concat all the records:
|
||||
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
|
||||
* pipeContext can be used to transfer additional context data to the external process
|
||||
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
|
||||
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
|
||||
* be customized by the last parameter delimiter.
|
||||
*/
|
||||
def pipe[U<: Seq[String]](
|
||||
command: String,
|
||||
env: Map[String, String],
|
||||
transform: (T,String => Unit) => Any,
|
||||
pipeContext: Broadcast[U]): RDD[String] =
|
||||
new PipedRDD(this, command, env, transform, pipeContext, "\u0001")
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
* How each record in RDD is outputed to the process can be controled by providing a
|
||||
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
|
||||
* the currnet record in RDD as the 1st parameter, and the function to output the record to
|
||||
* the external process (like out.println()) as the 2nd parameter.
|
||||
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
|
||||
* instead of constructing a huge String to concat all the records:
|
||||
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
|
||||
* pipeContext can be used to transfer additional context data to the external process
|
||||
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
|
||||
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
|
||||
* be customized by the last parameter delimiter.
|
||||
*/
|
||||
def pipe[U<: Seq[String]](
|
||||
command: Seq[String],
|
||||
env: Map[String, String] = Map(),
|
||||
transform: (T,String => Unit) => Any = null,
|
||||
pipeContext: Broadcast[U] = null,
|
||||
delimiter: String = "\u0001"): RDD[String] =
|
||||
new PipedRDD(this, command, env, transform, pipeContext, delimiter)
|
||||
|
||||
/**
|
||||
* Return a new RDD by applying a function to each partition of this RDD.
|
||||
|
|
|
@ -9,29 +9,33 @@ import scala.collection.mutable.ArrayBuffer
|
|||
import scala.io.Source
|
||||
|
||||
import spark.{RDD, SparkEnv, Partition, TaskContext}
|
||||
import spark.broadcast.Broadcast
|
||||
|
||||
|
||||
/**
|
||||
* An RDD that pipes the contents of each parent partition through an external command
|
||||
* (printing them one per line) and returns the output as a collection of strings.
|
||||
*/
|
||||
class PipedRDD[T: ClassManifest](
|
||||
class PipedRDD[T: ClassManifest, U <: Seq[String]](
|
||||
prev: RDD[T],
|
||||
command: Seq[String],
|
||||
envVars: Map[String, String],
|
||||
transform: (T, String => Unit) => Any,
|
||||
arguments: Seq[String]
|
||||
pipeContext: Broadcast[U],
|
||||
delimiter: String
|
||||
)
|
||||
extends RDD[String](prev) {
|
||||
|
||||
def this(prev: RDD[T], command: Seq[String], envVars : Map[String, String]) = this(prev, command, envVars, null, null)
|
||||
def this(prev: RDD[T], command: Seq[String]) = this(prev, command, Map())
|
||||
def this(prev: RDD[T], command: Seq[String], transform: (T,String => Unit) => Any, arguments: Seq[String]) = this(prev, command, Map(), transform, arguments)
|
||||
|
||||
// Similar to Runtime.exec(), if we are given a single string, split it into words
|
||||
// using a standard StringTokenizer (i.e. by spaces)
|
||||
def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command))
|
||||
def this(prev: RDD[T], command: String, transform: (T,String => Unit) => Any, arguments: Seq[String]) = this(prev, PipedRDD.tokenize(command), Map(), transform, arguments)
|
||||
def this(
|
||||
prev: RDD[T],
|
||||
command: String,
|
||||
envVars: Map[String, String] = Map(),
|
||||
transform: (T, String => Unit) => Any = null,
|
||||
pipeContext: Broadcast[U] = null,
|
||||
delimiter: String = "\u0001") =
|
||||
this(prev, PipedRDD.tokenize(command), envVars, transform, pipeContext, delimiter)
|
||||
|
||||
|
||||
override def getPartitions: Array[Partition] = firstParent[T].partitions
|
||||
|
@ -60,19 +64,18 @@ class PipedRDD[T: ClassManifest](
|
|||
SparkEnv.set(env)
|
||||
val out = new PrintWriter(proc.getOutputStream)
|
||||
|
||||
// input the arguments firstly
|
||||
if ( arguments != null) {
|
||||
for (elem <- arguments) {
|
||||
// input the pipeContext firstly
|
||||
if ( pipeContext != null) {
|
||||
for (elem <- pipeContext.value) {
|
||||
out.println(elem)
|
||||
}
|
||||
// ^A \n as the marker of the end of the arguments
|
||||
out.println("\u0001")
|
||||
// delimiter\n as the marker of the end of the pipeContext
|
||||
out.println(delimiter)
|
||||
}
|
||||
for (elem <- firstParent[T].iterator(split, context)) {
|
||||
if (transform != null) {
|
||||
transform(elem, out.println(_))
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
out.println(elem)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,8 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
|
|||
sc = new SparkContext("local", "test")
|
||||
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
|
||||
|
||||
val piped = nums.pipe(Seq("cat"), (i:Int, f: String=> Unit) => f(i + "_"), Array("0"))
|
||||
val piped = nums.pipe(Seq("cat"), Map[String, String](),
|
||||
(i:Int, f: String=> Unit) => f(i + "_"), sc.broadcast(List("0")))
|
||||
|
||||
val c = piped.collect()
|
||||
|
||||
|
@ -38,7 +39,9 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
|
|||
assert(c(7) === "4_")
|
||||
|
||||
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
|
||||
val d = nums1.groupBy(str=>str.split("\t")(0)).pipe(Seq("cat"), (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}, Array("0")).collect()
|
||||
val d = nums1.groupBy(str=>str.split("\t")(0)).
|
||||
pipe(Seq("cat"), Map[String, String](), (i:Tuple2[String, Seq[String]], f: String=> Unit) =>
|
||||
{for (e <- i._2){ f(e + "_")}}, sc.broadcast(List("0"))).collect()
|
||||
assert(d.size === 8)
|
||||
assert(d(0) === "0")
|
||||
assert(d(1) === "\u0001")
|
||||
|
|
Loading…
Reference in a new issue