diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java index 7ef9c6c8f4..e2d55f1a4e 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java @@ -58,10 +58,9 @@ public final class JavaQueueStream { } for (int i = 0; i < 30; i++) { - rddQueue.add(ssc.sc().parallelize(list)); + rddQueue.add(ssc.sparkContext().parallelize(list)); } - // Create the QueueInputDStream and use it do some processing JavaDStream inputStream = ssc.queueStream(rddQueue); JavaPairDStream mappedStream = inputStream.map( diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index 57e1b1f806..5a4aa7f3a2 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -88,7 +88,7 @@ extends Actor with Receiver { override def preStart = remotePublisher ! SubscribeReceiver(context.self) def receive = { - case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T]) + case msg ⇒ pushBlock(msg.asInstanceOf[T]) } override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 26257e652e..5847b95e3f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -42,9 +42,15 @@ import org.apache.spark.streaming.scheduler._ import org.apache.hadoop.conf.Configuration /** - * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic - * information (such as, cluster URL and job name) to internally create a SparkContext, it provides - * methods used to create DStream from various input sources. + * Main entry point for Spark Streaming functionality. It provides methods used to create + * [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either + * created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf + * configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext. + * The associated SparkContext can be accessed using `context.sparkContext`. After + * creating and transforming DStreams, the streaming computation can be started and stopped + * using `context.start()` and `context.stop()`, respectively. + * `context.awaitTransformation()` allows the current thread to wait for the termination + * of the context by `stop()` or by an exception. */ class StreamingContext private[streaming] ( sc_ : SparkContext, @@ -63,7 +69,7 @@ class StreamingContext private[streaming] ( /** * Create a StreamingContext by providing the configuration necessary for a new SparkContext. - * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters + * @param conf a org.apache.spark.SparkConf object specifying Spark parameters * @param batchDuration the time interval at which streaming data will be divided into batches */ def this(conf: SparkConf, batchDuration: Duration) = { @@ -88,7 +94,7 @@ class StreamingContext private[streaming] ( } /** - * Re-create a StreamingContext from a checkpoint file. + * Recreate a StreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory * @param hadoopConf Optional, configuration object if necessary for reading from * HDFS compatible filesystems @@ -151,6 +157,7 @@ class StreamingContext private[streaming] ( private[streaming] val scheduler = new JobScheduler(this) private[streaming] val waiter = new ContextWaiter + /** * Return the associated Spark context */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index c92854ccd9..e23b725052 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -27,22 +27,12 @@ import scala.reflect.ClassTag import org.apache.spark.streaming.dstream.DStream /** - * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] - * for more details on RDDs). DStreams can either be created from live data (such as, data from - * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations - * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each - * DStream periodically generates a RDD, either from live data or by transforming the RDD generated - * by a parent DStream. - * - * This class contains the basic operations available on all DStreams, such as `map`, `filter` and - * `window`. In addition, [[org.apache.spark.streaming.api.java.JavaPairDStream]] contains operations available - * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. - * - * DStreams internally is characterized by a few basic properties: - * - A list of other DStreams that the DStream depends on - * - A time interval at which the DStream generates an RDD - * - A function that is used to generate an RDD after each time interval + * A Java-friendly interface to [[org.apache.spark.streaming.dstream.DStream]], the basic + * abstraction in Spark Streaming that represents a continuous stream of data. + * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, + * etc.) or it can be generated by transforming existing DStreams using operations such as `map`, + * `window`. For operations applicable to key-value pair DStreams, see + * [[org.apache.spark.streaming.api.java.JavaPairDStream]]. */ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]) extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 6bb985ca54..79fa6a623d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -37,6 +37,10 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming.dstream.DStream +/** + * A Java-friendly interface to a DStream of key-value pairs, which provides extra methods + * like `reduceByKey` and `join`. + */ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifest: ClassTag[K], implicit val vManifest: ClassTag[V]) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 613683ca40..921b56143a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConversions._ import scala.reflect.ClassTag import java.io.InputStream -import java.lang.{Integer => JInt} import java.util.{List => JList, Map => JMap} import akka.actor.{Props, SupervisorStrategy} @@ -39,19 +38,20 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.dstream.DStream /** - * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic - * information (such as, cluster URL and job name) to internally create a SparkContext, it provides - * methods used to create DStream from various input sources. + * A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main + * entry point for Spark Streaming functionality. It provides methods to create + * [[org.apache.spark.streaming.api.java.JavaDStream]] and + * [[org.apache.spark.streaming.api.java.JavaPairDStream.]] from input sources. The internal + * org.apache.spark.api.java.JavaSparkContext (see core Spark documentation) can be accessed + * using `context.sparkContext`. After creating and transforming DStreams, the streaming + * computation can be started and stopped using `context.start()` and `context.stop()`, + * respectively. `context.awaitTransformation()` allows the current thread to wait for the + * termination of a context by `stop()` or by an exception. */ class JavaStreamingContext(val ssc: StreamingContext) { - // TODOs: - // - Test to/from Hadoop functions - // - Support creating and registering InputStreams - - /** - * Creates a StreamingContext. + * Create a StreamingContext. * @param master Name of the Spark Master * @param appName Name to be used when registering with the scheduler * @param batchDuration The time interval at which streaming data will be divided into batches @@ -60,7 +60,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { this(new StreamingContext(master, appName, batchDuration, null, Nil, Map())) /** - * Creates a StreamingContext. + * Create a StreamingContext. * @param master Name of the Spark Master * @param appName Name to be used when registering with the scheduler * @param batchDuration The time interval at which streaming data will be divided into batches @@ -77,7 +77,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map())) /** - * Creates a StreamingContext. + * Create a StreamingContext. * @param master Name of the Spark Master * @param appName Name to be used when registering with the scheduler * @param batchDuration The time interval at which streaming data will be divided into batches @@ -94,7 +94,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map())) /** - * Creates a StreamingContext. + * Create a StreamingContext. * @param master Name of the Spark Master * @param appName Name to be used when registering with the scheduler * @param batchDuration The time interval at which streaming data will be divided into batches @@ -113,7 +113,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment)) /** - * Creates a StreamingContext using an existing SparkContext. + * Create a JavaStreamingContext using an existing JavaSparkContext. * @param sparkContext The underlying JavaSparkContext to use * @param batchDuration The time interval at which streaming data will be divided into batches */ @@ -121,7 +121,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { this(new StreamingContext(sparkContext.sc, batchDuration)) /** - * Creates a StreamingContext using an existing SparkContext. + * Create a JavaStreamingContext using a SparkConf configuration. * @param conf A Spark application configuration * @param batchDuration The time interval at which streaming data will be divided into batches */ @@ -129,19 +129,18 @@ class JavaStreamingContext(val ssc: StreamingContext) { this(new StreamingContext(conf, batchDuration)) /** - * Re-creates a StreamingContext from a checkpoint file. + * Recreate a JavaStreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory */ def this(path: String) = this(new StreamingContext(path, new Configuration)) /** - * Re-creates a StreamingContext from a checkpoint file. + * Re-creates a JavaStreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory * */ def this(path: String, hadoopConf: Configuration) = this(new StreamingContext(path, hadoopConf)) - @deprecated("use sparkContext", "0.9.0") val sc: JavaSparkContext = sparkContext @@ -149,7 +148,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { val sparkContext = new JavaSparkContext(ssc.sc) /** - * Create a input stream from network source hostname:port. Data is received using + * Create an input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited * lines. * @param hostname Hostname to connect to for receiving data @@ -162,7 +161,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create a input stream from network source hostname:port. Data is received using + * Create an input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param hostname Hostname to connect to for receiving data @@ -173,7 +172,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create a input stream from network source hostname:port. Data is received using + * Create an input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes it interepreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data @@ -195,7 +194,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create a input stream that monitors a Hadoop-compatible filesystem + * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value * as Text and input format as TextInputFormat). Files must be written to the * monitored directory by "moving" them from another location within the same @@ -207,7 +206,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create a input stream from network source hostname:port, where data is received + * Create an input stream from network source hostname:port, where data is received * as serialized blocks (serialized using the Spark's serializer) that can be directly * pushed into the block manager without deserializing them. This is the most efficient * way to receive data. @@ -226,7 +225,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create a input stream from network source hostname:port, where data is received + * Create an input stream from network source hostname:port, where data is received * as serialized blocks (serialized using the Spark's serializer) that can be directly * pushed into the block manager without deserializing them. This is the most efficient * way to receive data. @@ -241,7 +240,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create a input stream that monitors a Hadoop-compatible filesystem + * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. @@ -324,7 +323,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream from an queue of RDDs. In each batch, + * Creates an input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * * NOTE: changes to the queue after the stream is created will not be recognized. @@ -340,7 +339,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream from an queue of RDDs. In each batch, + * Creates an input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * * NOTE: changes to the queue after the stream is created will not be recognized. @@ -357,7 +356,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream from an queue of RDDs. In each batch, + * Creates an input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * * NOTE: changes to the queue after the stream is created will not be recognized. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 71a4c5c93e..6bff56a9d3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -37,8 +37,9 @@ import org.apache.spark.streaming.Duration * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * sequence of RDDs (of the same type) representing a continuous stream of data (see * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). - * DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS) - * or it can be generated by transforming existing DStreams using operations such as `map`, + * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, + * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by + * transforming existing DStreams using operations such as `map`, * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream * periodically generates a RDD, either from live data or by transforming the RDD generated by a * parent DStream. @@ -540,7 +541,6 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) val cleanedF = context.sparkContext.clean(transformFunc) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index f57762321c..fb9df2f48e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -18,20 +18,17 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream._ import org.apache.spark.{Partitioner, HashPartitioner} import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions} -import org.apache.spark.storage.StorageLevel +import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer -import scala.reflect.{ClassTag, classTag} +import scala.reflect.ClassTag -import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.mapred.OutputFormat -import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.{Time, Duration} @@ -108,7 +105,7 @@ extends Serializable { /** * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information. */ def combineByKey[C: ClassTag]( createCombiner: V => C, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index fdf5371a89..79ed696814 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -44,40 +44,49 @@ object ReceiverSupervisorStrategy { /** * A receiver trait to be mixed in with your Actor to gain access to - * pushBlock API. + * the API for pushing received data into Spark Streaming for being processed. * * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html * * @example {{{ * class MyActor extends Actor with Receiver{ * def receive { - * case anything :String => pushBlock(anything) + * case anything: String => pushBlock(anything) * } * } - * //Can be plugged in actorStream as follows + * + * // Can be used with an actorStream as follows * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") * * }}} * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility + * @note Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e parametrized type of push block and InputDStream * should be same. - * */ -trait Receiver { self: Actor ⇒ +trait Receiver { + + self: Actor ⇒ // to ensure that this can be added to Actor classes only + + /** + * Push an iterator received data into Spark Streaming for processing + */ def pushBlock[T: ClassTag](iter: Iterator[T]) { context.parent ! Data(iter) } + /** + * Push a single item of received data into Spark Streaming for processing + */ def pushBlock[T: ClassTag](data: T) { context.parent ! Data(data) } - } /** - * Statistics for querying the supervisor about state of workers + * Statistics for querying the supervisor about state of workers. Used in + * conjunction with `StreamingContext.actorStream` and + * [[org.apache.spark.streaming.receivers.Receiver]]. */ case class Statistics(numberOfMsgs: Int, numberOfWorkers: Int, @@ -96,17 +105,15 @@ private[streaming] case class Data[T: ClassTag](data: T) * his own Actor to run as receiver for Spark Streaming input source. * * This starts a supervisor actor which starts workers and also provides - * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance]. * - * Here's a way to start more supervisor/workers as its children. + * Here's a way to start more supervisor/workers as its children. * * @example {{{ * context.parent ! Props(new Supervisor) * }}} OR {{{ - * context.parent ! Props(new Worker,"Worker") + * context.parent ! Props(new Worker, "Worker") * }}} - * - * */ private[streaming] class ActorReceiver[T: ClassTag]( props: Props,