Merge pull request #458 from tdas/docs-update
Updated java API docs for streaming, along with very minor changes in the code examples. Docs updated for: Scala: StreamingContext, DStream, PairDStreamFunctions Java: JavaStreamingContext, JavaDStream, JavaPairDStream Example updated: JavaQueueStream: Not use deprecated method ActorWordCount: Use the public interface the right way.
This commit is contained in:
commit
256a3553c4
|
@ -58,10 +58,9 @@ public final class JavaQueueStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < 30; i++) {
|
for (int i = 0; i < 30; i++) {
|
||||||
rddQueue.add(ssc.sc().parallelize(list));
|
rddQueue.add(ssc.sparkContext().parallelize(list));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Create the QueueInputDStream and use it do some processing
|
// Create the QueueInputDStream and use it do some processing
|
||||||
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
|
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
|
||||||
JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
|
JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
|
||||||
|
|
|
@ -88,7 +88,7 @@ extends Actor with Receiver {
|
||||||
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
|
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T])
|
case msg ⇒ pushBlock(msg.asInstanceOf[T])
|
||||||
}
|
}
|
||||||
|
|
||||||
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
|
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
|
||||||
|
|
|
@ -42,9 +42,15 @@ import org.apache.spark.streaming.scheduler._
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
|
* Main entry point for Spark Streaming functionality. It provides methods used to create
|
||||||
* information (such as, cluster URL and job name) to internally create a SparkContext, it provides
|
* [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either
|
||||||
* methods used to create DStream from various input sources.
|
* created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
|
||||||
|
* configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
|
||||||
|
* The associated SparkContext can be accessed using `context.sparkContext`. After
|
||||||
|
* creating and transforming DStreams, the streaming computation can be started and stopped
|
||||||
|
* using `context.start()` and `context.stop()`, respectively.
|
||||||
|
* `context.awaitTransformation()` allows the current thread to wait for the termination
|
||||||
|
* of the context by `stop()` or by an exception.
|
||||||
*/
|
*/
|
||||||
class StreamingContext private[streaming] (
|
class StreamingContext private[streaming] (
|
||||||
sc_ : SparkContext,
|
sc_ : SparkContext,
|
||||||
|
@ -63,7 +69,7 @@ class StreamingContext private[streaming] (
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
|
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
|
||||||
* @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
|
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
|
||||||
* @param batchDuration the time interval at which streaming data will be divided into batches
|
* @param batchDuration the time interval at which streaming data will be divided into batches
|
||||||
*/
|
*/
|
||||||
def this(conf: SparkConf, batchDuration: Duration) = {
|
def this(conf: SparkConf, batchDuration: Duration) = {
|
||||||
|
@ -88,7 +94,7 @@ class StreamingContext private[streaming] (
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Re-create a StreamingContext from a checkpoint file.
|
* Recreate a StreamingContext from a checkpoint file.
|
||||||
* @param path Path to the directory that was specified as the checkpoint directory
|
* @param path Path to the directory that was specified as the checkpoint directory
|
||||||
* @param hadoopConf Optional, configuration object if necessary for reading from
|
* @param hadoopConf Optional, configuration object if necessary for reading from
|
||||||
* HDFS compatible filesystems
|
* HDFS compatible filesystems
|
||||||
|
@ -151,6 +157,7 @@ class StreamingContext private[streaming] (
|
||||||
private[streaming] val scheduler = new JobScheduler(this)
|
private[streaming] val scheduler = new JobScheduler(this)
|
||||||
|
|
||||||
private[streaming] val waiter = new ContextWaiter
|
private[streaming] val waiter = new ContextWaiter
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the associated Spark context
|
* Return the associated Spark context
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -27,22 +27,12 @@ import scala.reflect.ClassTag
|
||||||
import org.apache.spark.streaming.dstream.DStream
|
import org.apache.spark.streaming.dstream.DStream
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
|
* A Java-friendly interface to [[org.apache.spark.streaming.dstream.DStream]], the basic
|
||||||
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
|
* abstraction in Spark Streaming that represents a continuous stream of data.
|
||||||
* for more details on RDDs). DStreams can either be created from live data (such as, data from
|
* DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
|
||||||
* HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
|
* etc.) or it can be generated by transforming existing DStreams using operations such as `map`,
|
||||||
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
|
* `window`. For operations applicable to key-value pair DStreams, see
|
||||||
* DStream periodically generates a RDD, either from live data or by transforming the RDD generated
|
* [[org.apache.spark.streaming.api.java.JavaPairDStream]].
|
||||||
* by a parent DStream.
|
|
||||||
*
|
|
||||||
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
|
|
||||||
* `window`. In addition, [[org.apache.spark.streaming.api.java.JavaPairDStream]] contains operations available
|
|
||||||
* only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`.
|
|
||||||
*
|
|
||||||
* DStreams internally is characterized by a few basic properties:
|
|
||||||
* - A list of other DStreams that the DStream depends on
|
|
||||||
* - A time interval at which the DStream generates an RDD
|
|
||||||
* - A function that is used to generate an RDD after each time interval
|
|
||||||
*/
|
*/
|
||||||
class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T])
|
class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T])
|
||||||
extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
|
extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
|
||||||
|
|
|
@ -37,6 +37,10 @@ import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.rdd.PairRDDFunctions
|
import org.apache.spark.rdd.PairRDDFunctions
|
||||||
import org.apache.spark.streaming.dstream.DStream
|
import org.apache.spark.streaming.dstream.DStream
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Java-friendly interface to a DStream of key-value pairs, which provides extra methods
|
||||||
|
* like `reduceByKey` and `join`.
|
||||||
|
*/
|
||||||
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
|
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
|
||||||
implicit val kManifest: ClassTag[K],
|
implicit val kManifest: ClassTag[K],
|
||||||
implicit val vManifest: ClassTag[V])
|
implicit val vManifest: ClassTag[V])
|
||||||
|
|
|
@ -22,7 +22,6 @@ import scala.collection.JavaConversions._
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
import java.lang.{Integer => JInt}
|
|
||||||
import java.util.{List => JList, Map => JMap}
|
import java.util.{List => JList, Map => JMap}
|
||||||
|
|
||||||
import akka.actor.{Props, SupervisorStrategy}
|
import akka.actor.{Props, SupervisorStrategy}
|
||||||
|
@ -39,19 +38,20 @@ import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.spark.streaming.dstream.DStream
|
import org.apache.spark.streaming.dstream.DStream
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
|
* A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main
|
||||||
* information (such as, cluster URL and job name) to internally create a SparkContext, it provides
|
* entry point for Spark Streaming functionality. It provides methods to create
|
||||||
* methods used to create DStream from various input sources.
|
* [[org.apache.spark.streaming.api.java.JavaDStream]] and
|
||||||
|
* [[org.apache.spark.streaming.api.java.JavaPairDStream.]] from input sources. The internal
|
||||||
|
* org.apache.spark.api.java.JavaSparkContext (see core Spark documentation) can be accessed
|
||||||
|
* using `context.sparkContext`. After creating and transforming DStreams, the streaming
|
||||||
|
* computation can be started and stopped using `context.start()` and `context.stop()`,
|
||||||
|
* respectively. `context.awaitTransformation()` allows the current thread to wait for the
|
||||||
|
* termination of a context by `stop()` or by an exception.
|
||||||
*/
|
*/
|
||||||
class JavaStreamingContext(val ssc: StreamingContext) {
|
class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
|
|
||||||
// TODOs:
|
|
||||||
// - Test to/from Hadoop functions
|
|
||||||
// - Support creating and registering InputStreams
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a StreamingContext.
|
* Create a StreamingContext.
|
||||||
* @param master Name of the Spark Master
|
* @param master Name of the Spark Master
|
||||||
* @param appName Name to be used when registering with the scheduler
|
* @param appName Name to be used when registering with the scheduler
|
||||||
* @param batchDuration The time interval at which streaming data will be divided into batches
|
* @param batchDuration The time interval at which streaming data will be divided into batches
|
||||||
|
@ -60,7 +60,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
this(new StreamingContext(master, appName, batchDuration, null, Nil, Map()))
|
this(new StreamingContext(master, appName, batchDuration, null, Nil, Map()))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a StreamingContext.
|
* Create a StreamingContext.
|
||||||
* @param master Name of the Spark Master
|
* @param master Name of the Spark Master
|
||||||
* @param appName Name to be used when registering with the scheduler
|
* @param appName Name to be used when registering with the scheduler
|
||||||
* @param batchDuration The time interval at which streaming data will be divided into batches
|
* @param batchDuration The time interval at which streaming data will be divided into batches
|
||||||
|
@ -77,7 +77,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map()))
|
this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map()))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a StreamingContext.
|
* Create a StreamingContext.
|
||||||
* @param master Name of the Spark Master
|
* @param master Name of the Spark Master
|
||||||
* @param appName Name to be used when registering with the scheduler
|
* @param appName Name to be used when registering with the scheduler
|
||||||
* @param batchDuration The time interval at which streaming data will be divided into batches
|
* @param batchDuration The time interval at which streaming data will be divided into batches
|
||||||
|
@ -94,7 +94,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map()))
|
this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map()))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a StreamingContext.
|
* Create a StreamingContext.
|
||||||
* @param master Name of the Spark Master
|
* @param master Name of the Spark Master
|
||||||
* @param appName Name to be used when registering with the scheduler
|
* @param appName Name to be used when registering with the scheduler
|
||||||
* @param batchDuration The time interval at which streaming data will be divided into batches
|
* @param batchDuration The time interval at which streaming data will be divided into batches
|
||||||
|
@ -113,7 +113,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
|
this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a StreamingContext using an existing SparkContext.
|
* Create a JavaStreamingContext using an existing JavaSparkContext.
|
||||||
* @param sparkContext The underlying JavaSparkContext to use
|
* @param sparkContext The underlying JavaSparkContext to use
|
||||||
* @param batchDuration The time interval at which streaming data will be divided into batches
|
* @param batchDuration The time interval at which streaming data will be divided into batches
|
||||||
*/
|
*/
|
||||||
|
@ -121,7 +121,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
this(new StreamingContext(sparkContext.sc, batchDuration))
|
this(new StreamingContext(sparkContext.sc, batchDuration))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a StreamingContext using an existing SparkContext.
|
* Create a JavaStreamingContext using a SparkConf configuration.
|
||||||
* @param conf A Spark application configuration
|
* @param conf A Spark application configuration
|
||||||
* @param batchDuration The time interval at which streaming data will be divided into batches
|
* @param batchDuration The time interval at which streaming data will be divided into batches
|
||||||
*/
|
*/
|
||||||
|
@ -129,19 +129,18 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
this(new StreamingContext(conf, batchDuration))
|
this(new StreamingContext(conf, batchDuration))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Re-creates a StreamingContext from a checkpoint file.
|
* Recreate a JavaStreamingContext from a checkpoint file.
|
||||||
* @param path Path to the directory that was specified as the checkpoint directory
|
* @param path Path to the directory that was specified as the checkpoint directory
|
||||||
*/
|
*/
|
||||||
def this(path: String) = this(new StreamingContext(path, new Configuration))
|
def this(path: String) = this(new StreamingContext(path, new Configuration))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Re-creates a StreamingContext from a checkpoint file.
|
* Re-creates a JavaStreamingContext from a checkpoint file.
|
||||||
* @param path Path to the directory that was specified as the checkpoint directory
|
* @param path Path to the directory that was specified as the checkpoint directory
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
def this(path: String, hadoopConf: Configuration) = this(new StreamingContext(path, hadoopConf))
|
def this(path: String, hadoopConf: Configuration) = this(new StreamingContext(path, hadoopConf))
|
||||||
|
|
||||||
|
|
||||||
@deprecated("use sparkContext", "0.9.0")
|
@deprecated("use sparkContext", "0.9.0")
|
||||||
val sc: JavaSparkContext = sparkContext
|
val sc: JavaSparkContext = sparkContext
|
||||||
|
|
||||||
|
@ -149,7 +148,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
val sparkContext = new JavaSparkContext(ssc.sc)
|
val sparkContext = new JavaSparkContext(ssc.sc)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a input stream from network source hostname:port. Data is received using
|
* Create an input stream from network source hostname:port. Data is received using
|
||||||
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
|
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
|
||||||
* lines.
|
* lines.
|
||||||
* @param hostname Hostname to connect to for receiving data
|
* @param hostname Hostname to connect to for receiving data
|
||||||
|
@ -162,7 +161,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a input stream from network source hostname:port. Data is received using
|
* Create an input stream from network source hostname:port. Data is received using
|
||||||
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
|
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
|
||||||
* lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
|
* lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
|
||||||
* @param hostname Hostname to connect to for receiving data
|
* @param hostname Hostname to connect to for receiving data
|
||||||
|
@ -173,7 +172,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a input stream from network source hostname:port. Data is received using
|
* Create an input stream from network source hostname:port. Data is received using
|
||||||
* a TCP socket and the receive bytes it interepreted as object using the given
|
* a TCP socket and the receive bytes it interepreted as object using the given
|
||||||
* converter.
|
* converter.
|
||||||
* @param hostname Hostname to connect to for receiving data
|
* @param hostname Hostname to connect to for receiving data
|
||||||
|
@ -195,7 +194,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a input stream that monitors a Hadoop-compatible filesystem
|
* Create an input stream that monitors a Hadoop-compatible filesystem
|
||||||
* for new files and reads them as text files (using key as LongWritable, value
|
* for new files and reads them as text files (using key as LongWritable, value
|
||||||
* as Text and input format as TextInputFormat). Files must be written to the
|
* as Text and input format as TextInputFormat). Files must be written to the
|
||||||
* monitored directory by "moving" them from another location within the same
|
* monitored directory by "moving" them from another location within the same
|
||||||
|
@ -207,7 +206,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a input stream from network source hostname:port, where data is received
|
* Create an input stream from network source hostname:port, where data is received
|
||||||
* as serialized blocks (serialized using the Spark's serializer) that can be directly
|
* as serialized blocks (serialized using the Spark's serializer) that can be directly
|
||||||
* pushed into the block manager without deserializing them. This is the most efficient
|
* pushed into the block manager without deserializing them. This is the most efficient
|
||||||
* way to receive data.
|
* way to receive data.
|
||||||
|
@ -226,7 +225,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a input stream from network source hostname:port, where data is received
|
* Create an input stream from network source hostname:port, where data is received
|
||||||
* as serialized blocks (serialized using the Spark's serializer) that can be directly
|
* as serialized blocks (serialized using the Spark's serializer) that can be directly
|
||||||
* pushed into the block manager without deserializing them. This is the most efficient
|
* pushed into the block manager without deserializing them. This is the most efficient
|
||||||
* way to receive data.
|
* way to receive data.
|
||||||
|
@ -241,7 +240,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a input stream that monitors a Hadoop-compatible filesystem
|
* Create an input stream that monitors a Hadoop-compatible filesystem
|
||||||
* for new files and reads them using the given key-value types and input format.
|
* for new files and reads them using the given key-value types and input format.
|
||||||
* Files must be written to the monitored directory by "moving" them from another
|
* Files must be written to the monitored directory by "moving" them from another
|
||||||
* location within the same file system. File names starting with . are ignored.
|
* location within the same file system. File names starting with . are ignored.
|
||||||
|
@ -324,7 +323,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a input stream from an queue of RDDs. In each batch,
|
* Creates an input stream from an queue of RDDs. In each batch,
|
||||||
* it will process either one or all of the RDDs returned by the queue.
|
* it will process either one or all of the RDDs returned by the queue.
|
||||||
*
|
*
|
||||||
* NOTE: changes to the queue after the stream is created will not be recognized.
|
* NOTE: changes to the queue after the stream is created will not be recognized.
|
||||||
|
@ -340,7 +339,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a input stream from an queue of RDDs. In each batch,
|
* Creates an input stream from an queue of RDDs. In each batch,
|
||||||
* it will process either one or all of the RDDs returned by the queue.
|
* it will process either one or all of the RDDs returned by the queue.
|
||||||
*
|
*
|
||||||
* NOTE: changes to the queue after the stream is created will not be recognized.
|
* NOTE: changes to the queue after the stream is created will not be recognized.
|
||||||
|
@ -357,7 +356,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a input stream from an queue of RDDs. In each batch,
|
* Creates an input stream from an queue of RDDs. In each batch,
|
||||||
* it will process either one or all of the RDDs returned by the queue.
|
* it will process either one or all of the RDDs returned by the queue.
|
||||||
*
|
*
|
||||||
* NOTE: changes to the queue after the stream is created will not be recognized.
|
* NOTE: changes to the queue after the stream is created will not be recognized.
|
||||||
|
|
|
@ -37,8 +37,9 @@ import org.apache.spark.streaming.Duration
|
||||||
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
|
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
|
||||||
* sequence of RDDs (of the same type) representing a continuous stream of data (see
|
* sequence of RDDs (of the same type) representing a continuous stream of data (see
|
||||||
* org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
|
* org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
|
||||||
* DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS)
|
* DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
|
||||||
* or it can be generated by transforming existing DStreams using operations such as `map`,
|
* etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
|
||||||
|
* transforming existing DStreams using operations such as `map`,
|
||||||
* `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
|
* `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
|
||||||
* periodically generates a RDD, either from live data or by transforming the RDD generated by a
|
* periodically generates a RDD, either from live data or by transforming the RDD generated by a
|
||||||
* parent DStream.
|
* parent DStream.
|
||||||
|
@ -540,7 +541,6 @@ abstract class DStream[T: ClassTag] (
|
||||||
* on each RDD of 'this' DStream.
|
* on each RDD of 'this' DStream.
|
||||||
*/
|
*/
|
||||||
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
|
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
|
||||||
//new TransformedDStream(this, context.sparkContext.clean(transformFunc))
|
|
||||||
val cleanedF = context.sparkContext.clean(transformFunc)
|
val cleanedF = context.sparkContext.clean(transformFunc)
|
||||||
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
|
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
|
||||||
assert(rdds.length == 1)
|
assert(rdds.length == 1)
|
||||||
|
|
|
@ -18,20 +18,17 @@
|
||||||
package org.apache.spark.streaming.dstream
|
package org.apache.spark.streaming.dstream
|
||||||
|
|
||||||
import org.apache.spark.streaming.StreamingContext._
|
import org.apache.spark.streaming.StreamingContext._
|
||||||
import org.apache.spark.streaming.dstream._
|
|
||||||
|
|
||||||
import org.apache.spark.{Partitioner, HashPartitioner}
|
import org.apache.spark.{Partitioner, HashPartitioner}
|
||||||
import org.apache.spark.SparkContext._
|
import org.apache.spark.SparkContext._
|
||||||
import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.storage.StorageLevel
|
|
||||||
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.reflect.{ClassTag, classTag}
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
|
import org.apache.hadoop.mapred.JobConf
|
||||||
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
|
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
|
||||||
import org.apache.hadoop.mapred.OutputFormat
|
import org.apache.hadoop.mapred.OutputFormat
|
||||||
import org.apache.hadoop.security.UserGroupInformation
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.spark.streaming.{Time, Duration}
|
import org.apache.spark.streaming.{Time, Duration}
|
||||||
|
|
||||||
|
@ -108,7 +105,7 @@ extends Serializable {
|
||||||
/**
|
/**
|
||||||
* Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
|
* Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
|
||||||
* combineByKey for RDDs. Please refer to combineByKey in
|
* combineByKey for RDDs. Please refer to combineByKey in
|
||||||
* [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
|
* org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.
|
||||||
*/
|
*/
|
||||||
def combineByKey[C: ClassTag](
|
def combineByKey[C: ClassTag](
|
||||||
createCombiner: V => C,
|
createCombiner: V => C,
|
||||||
|
|
|
@ -44,40 +44,49 @@ object ReceiverSupervisorStrategy {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A receiver trait to be mixed in with your Actor to gain access to
|
* A receiver trait to be mixed in with your Actor to gain access to
|
||||||
* pushBlock API.
|
* the API for pushing received data into Spark Streaming for being processed.
|
||||||
*
|
*
|
||||||
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
|
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
|
||||||
*
|
*
|
||||||
* @example {{{
|
* @example {{{
|
||||||
* class MyActor extends Actor with Receiver{
|
* class MyActor extends Actor with Receiver{
|
||||||
* def receive {
|
* def receive {
|
||||||
* case anything :String => pushBlock(anything)
|
* case anything: String => pushBlock(anything)
|
||||||
* }
|
* }
|
||||||
* }
|
* }
|
||||||
* //Can be plugged in actorStream as follows
|
*
|
||||||
|
* // Can be used with an actorStream as follows
|
||||||
* ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
|
* ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
|
||||||
*
|
*
|
||||||
* }}}
|
* }}}
|
||||||
*
|
*
|
||||||
* @note An important point to note:
|
* @note Since Actor may exist outside the spark framework, It is thus user's responsibility
|
||||||
* Since Actor may exist outside the spark framework, It is thus user's responsibility
|
|
||||||
* to ensure the type safety, i.e parametrized type of push block and InputDStream
|
* to ensure the type safety, i.e parametrized type of push block and InputDStream
|
||||||
* should be same.
|
* should be same.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
trait Receiver { self: Actor ⇒
|
trait Receiver {
|
||||||
|
|
||||||
|
self: Actor ⇒ // to ensure that this can be added to Actor classes only
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push an iterator received data into Spark Streaming for processing
|
||||||
|
*/
|
||||||
def pushBlock[T: ClassTag](iter: Iterator[T]) {
|
def pushBlock[T: ClassTag](iter: Iterator[T]) {
|
||||||
context.parent ! Data(iter)
|
context.parent ! Data(iter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push a single item of received data into Spark Streaming for processing
|
||||||
|
*/
|
||||||
def pushBlock[T: ClassTag](data: T) {
|
def pushBlock[T: ClassTag](data: T) {
|
||||||
context.parent ! Data(data)
|
context.parent ! Data(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Statistics for querying the supervisor about state of workers
|
* Statistics for querying the supervisor about state of workers. Used in
|
||||||
|
* conjunction with `StreamingContext.actorStream` and
|
||||||
|
* [[org.apache.spark.streaming.receivers.Receiver]].
|
||||||
*/
|
*/
|
||||||
case class Statistics(numberOfMsgs: Int,
|
case class Statistics(numberOfMsgs: Int,
|
||||||
numberOfWorkers: Int,
|
numberOfWorkers: Int,
|
||||||
|
@ -96,17 +105,15 @@ private[streaming] case class Data[T: ClassTag](data: T)
|
||||||
* his own Actor to run as receiver for Spark Streaming input source.
|
* his own Actor to run as receiver for Spark Streaming input source.
|
||||||
*
|
*
|
||||||
* This starts a supervisor actor which starts workers and also provides
|
* This starts a supervisor actor which starts workers and also provides
|
||||||
* [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
|
* [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
|
||||||
*
|
*
|
||||||
* Here's a way to start more supervisor/workers as its children.
|
* Here's a way to start more supervisor/workers as its children.
|
||||||
*
|
*
|
||||||
* @example {{{
|
* @example {{{
|
||||||
* context.parent ! Props(new Supervisor)
|
* context.parent ! Props(new Supervisor)
|
||||||
* }}} OR {{{
|
* }}} OR {{{
|
||||||
* context.parent ! Props(new Worker,"Worker")
|
* context.parent ! Props(new Worker, "Worker")
|
||||||
* }}}
|
* }}}
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
private[streaming] class ActorReceiver[T: ClassTag](
|
private[streaming] class ActorReceiver[T: ClassTag](
|
||||||
props: Props,
|
props: Props,
|
||||||
|
|
Loading…
Reference in a new issue