[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project

Include the following changes:

1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream
2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream"
3. Update the ActorWordCount example and add the JavaActorWordCount example
4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #10744 from zsxwing/streaming-akka-2.
This commit is contained in:
Shixiong Zhu 2016-01-20 13:55:41 -08:00 committed by Tathagata Das
parent 944fdadf77
commit b7d74a602f
22 changed files with 602 additions and 186 deletions

View file

@ -222,6 +222,18 @@ streaming_flume_sink = Module(
)
streaming_akka = Module(
name="streaming-akka",
dependencies=[streaming],
source_file_regexes=[
"external/akka",
],
sbt_test_goals=[
"streaming-akka/test",
]
)
streaming_flume = Module(
name="streaming-flume",
dependencies=[streaming],

View file

@ -257,25 +257,54 @@ The following table summarizes the characteristics of both types of receivers
## Implementing and Using a Custom Actor-based Receiver
<div class="codetabs">
<div data-lang="scala" markdown="1" >
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to
receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using
`store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver)
allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.
{% highlight scala %}
class CustomActor extends Actor with ActorHelper {
class CustomActor extends ActorReceiver {
def receive = {
case data: String => store(data)
}
}
{% endhighlight %}
And a new input stream can be created with this custom actor as
{% highlight scala %}
// A new input stream can be created with this custom actor as
val ssc: StreamingContext = ...
val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver")
val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")
{% endhighlight %}
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
for an end-to-end example.
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example.
</div>
<div data-lang="java" markdown="1">
Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to
receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver)
allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.
{% highlight java %}
class CustomActor extends JavaActorReceiver {
@Override
public void onReceive(Object msg) throws Exception {
store((String) msg);
}
}
// A new input stream can be created with this custom actor as
JavaStreamingContext jssc = ...;
JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");
{% endhighlight %}
See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example.
</div>
</div>
<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API.

View file

@ -659,11 +659,11 @@ methods for creating DStreams from files and Akka actors as input sources.
<span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only `textFileStream` is available.
- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka
actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver
actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. See the [Custom Receiver
Guide](streaming-custom-receivers.html) for more details.
<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala
libraries, `actorStream` is not available in the Python API.
libraries, `AkkaUtils.createStream` is not available in the Python API.
- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.

View file

@ -75,6 +75,11 @@
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>

View file

@ -31,7 +31,8 @@ import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.JavaActorReceiver;
import org.apache.spark.streaming.akka.AkkaUtils;
import org.apache.spark.streaming.akka.JavaActorReceiver;
/**
* A sample actor as receiver, is also simplest. This receiver actor
@ -56,6 +57,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver {
remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
}
@Override
public void onReceive(Object msg) throws Exception {
store((T) msg);
}
@ -100,18 +102,20 @@ public class JavaActorWordCount {
String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
/*
* Following is the use of actorStream to plug in custom actor as receiver
* Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and InputDstream
* should be same.
*
* For example: Both actorStream and JavaSampleActorReceiver are parameterized
* For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
JavaDStream<String> lines = jssc.actorStream(
Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver");
JavaDStream<String> lines = AkkaUtils.createStream(
jssc,
Props.create(JavaSampleActorReceiver.class, feederActorURI),
"SampleReceiver");
// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {

View file

@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList
import scala.reflect.ClassTag
import scala.util.Random
import akka.actor.{actorRef2Scala, Actor, ActorRef, Props}
import akka.actor._
import com.typesafe.config.ConfigFactory
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.ActorReceiver
import org.apache.spark.util.AkkaUtils
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
@ -78,8 +78,7 @@ class FeederActor extends Actor {
*
* @see [[org.apache.spark.examples.streaming.FeederActor]]
*/
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
extends ActorReceiver {
class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
@ -108,9 +107,13 @@ object FeederActor {
}
val Seq(host, port) = args.toSeq
val conf = new SparkConf
val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
securityManager = new SecurityManager(conf))._1
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = $port
|""".stripMargin)
val actorSystem = ActorSystem("test", akkaConf)
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
@ -121,6 +124,7 @@ object FeederActor {
/**
* A sample word count program demonstrating the use of plugging in
*
* Actor as Receiver
* Usage: ActorWordCount <hostname> <port>
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
@ -146,20 +150,21 @@ object ActorWordCount {
val ssc = new StreamingContext(sparkConf, Seconds(2))
/*
* Following is the use of actorStream to plug in custom actor as receiver
* Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and InputDstream
* to ensure the type safety, i.e type of data received and InputDStream
* should be same.
*
* For example: Both actorStream and SampleActorReceiver are parameterized
* For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
val lines = ssc.actorStream[String](
Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
val lines = AkkaUtils.createStream[String](
ssc,
Props(classOf[SampleActorReceiver[String]],
"akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
"SampleReceiver")
// compute wordcount
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()

View file

@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala
import akka.util.ByteString
import akka.zeromq._
import akka.zeromq.Subscribe
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.zeromq._
@ -69,10 +70,10 @@ object SimpleZeroMQPublisher {
*
* To run this example locally, you may run publisher as
* `$ bin/run-example \
* org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
* and run the example as
* `$ bin/run-example \
* org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
* org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
*/
// scalastyle:on
object ZeroMQWordCount {
@ -90,7 +91,11 @@ object ZeroMQWordCount {
def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
// For this stream, a zeroMQ publisher should be running.
val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
val lines = ZeroMQUtils.createStream(
ssc,
url,
Subscribe(topic),
bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

73
external/akka/pom.xml vendored Normal file
View file

@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-akka_2.10</artifactId>
<properties>
<sbt.project.name>streaming-akka</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project External Akka</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-actor_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.streaming.receiver
package org.apache.spark.streaming.akka
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
@ -26,23 +26,44 @@ import scala.reflect.ClassTag
import akka.actor._
import akka.actor.SupervisorStrategy.{Escalate, Restart}
import com.typesafe.config.ConfigFactory
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
/**
* :: DeveloperApi ::
* A helper with set of defaults for supervisor strategy
*/
@DeveloperApi
object ActorSupervisorStrategy {
object ActorReceiver {
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
/**
* A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and
* `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for
* others, it just escalates the failure to the supervisor of the supervisor.
*/
val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException => Restart
case _: Exception => Escalate
}
/**
* A default ActorSystem creator. It will use a unique system name
* (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
* communication.
*/
val defaultActorSystemCreator: () => ActorSystem = () => {
val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
|""".stripMargin)
ActorSystem(uniqueSystemName, akkaConf)
}
}
/**
@ -58,13 +79,12 @@ object ActorSupervisorStrategy {
* }
* }
*
* // Can be used with an actorStream as follows
* ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
* AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver")
*
* }}}
*
* @note Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of push block and InputDStream
* to ensure the type safety, i.e. parametrized type of push block and InputDStream
* should be same.
*/
@DeveloperApi
@ -103,18 +123,18 @@ abstract class ActorReceiver extends Actor {
*
* @example {{{
* class MyActor extends JavaActorReceiver {
* def receive {
* case anything: String => store(anything)
* @Override
* public void onReceive(Object msg) throws Exception {
* store((String) msg);
* }
* }
*
* // Can be used with an actorStream as follows
* ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
* AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver");
*
* }}}
*
* @note Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of push block and InputDStream
* to ensure the type safety, i.e. parametrized type of push block and InputDStream
* should be same.
*/
@DeveloperApi
@ -147,8 +167,8 @@ abstract class JavaActorReceiver extends UntypedActor {
/**
* :: DeveloperApi ::
* Statistics for querying the supervisor about state of workers. Used in
* conjunction with `StreamingContext.actorStream` and
* [[org.apache.spark.streaming.receiver.ActorReceiver]].
* conjunction with `AkkaUtils.createStream` and
* [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]].
*/
@DeveloperApi
case class Statistics(numberOfMsgs: Int,
@ -157,10 +177,10 @@ case class Statistics(numberOfMsgs: Int,
otherInfo: String)
/** Case class to receive data sent by child actors */
private[streaming] sealed trait ActorReceiverData
private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData
private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
private[akka] sealed trait ActorReceiverData
private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
/**
* Provides Actors as receivers for receiving stream.
@ -181,14 +201,16 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec
* context.parent ! Props(new Worker, "Worker")
* }}}
*/
private[streaming] class ActorReceiverSupervisor[T: ClassTag](
private[akka] class ActorReceiverSupervisor[T: ClassTag](
actorSystemCreator: () => ActorSystem,
props: Props,
name: String,
storageLevel: StorageLevel,
receiverSupervisorStrategy: SupervisorStrategy
) extends Receiver[T](storageLevel) with Logging {
protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
private lazy val actorSystem = actorSystemCreator()
protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
class Supervisor extends Actor {
@ -241,5 +263,7 @@ private[streaming] class ActorReceiverSupervisor[T: ClassTag](
def onStop(): Unit = {
actorSupervisor ! PoisonPill
actorSystem.shutdown()
actorSystem.awaitTermination()
}
}

View file

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.akka
import scala.reflect.ClassTag
import akka.actor.{ActorSystem, Props, SupervisorStrategy}
import org.apache.spark.api.java.function.{Function0 => JFunction0}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
object AkkaUtils {
/**
* Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details.
*
* @param ssc The StreamingContext instance
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
* @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping (default:
* ActorReceiver.defaultActorSystemCreator)
* @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e. parametrized type of data received and createStream
* should be same.
*/
def createStream[T: ClassTag](
ssc: StreamingContext,
propsForActor: Props,
actorName: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
val cleanF = ssc.sc.clean(actorSystemCreator)
ssc.receiverStream(new ActorReceiverSupervisor[T](
cleanF,
propsForActor,
actorName,
storageLevel,
supervisorStrategy))
}
/**
* Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
*
* @param jssc The StreamingContext instance
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
* @param storageLevel Storage level to use for storing the received objects
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping.
* @param supervisorStrategy the supervisor strategy
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e. parametrized type of data received and createStream
* should be same.
*/
def createStream[T](
jssc: JavaStreamingContext,
propsForActor: Props,
actorName: String,
storageLevel: StorageLevel,
actorSystemCreator: JFunction0[ActorSystem],
supervisorStrategy: SupervisorStrategy
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
createStream[T](
jssc.ssc,
propsForActor,
actorName,
storageLevel,
() => actorSystemCreator.call(),
supervisorStrategy)
}
/**
* Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
*
* @param jssc The StreamingContext instance
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
* @param storageLevel Storage level to use for storing the received objects
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e. parametrized type of data received and createStream
* should be same.
*/
def createStream[T](
jssc: JavaStreamingContext,
propsForActor: Props,
actorName: String,
storageLevel: StorageLevel
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
createStream[T](jssc.ssc, propsForActor, actorName, storageLevel)
}
/**
* Create an input stream with a user-defined actor. Storage level of the data will be the default
* StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
*
* @param jssc The StreamingContext instance
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e. parametrized type of data received and createStream
* should be same.
*/
def createStream[T](
jssc: JavaStreamingContext,
propsForActor: Props,
actorName: String
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
createStream[T](jssc.ssc, propsForActor, actorName)
}
}

View file

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.akka;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.Test;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
public class JavaAkkaUtilsSuite {
@Test // tests the API, does not actually test data receiving
public void testAkkaUtils() {
JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
try {
JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
jsc, Props.create(JavaTestActor.class), "test");
JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
jsc,
Props.create(JavaTestActor.class),
"test", StorageLevel.MEMORY_AND_DISK_SER_2(),
new ActorSystemCreatorForTest(),
SupervisorStrategy.defaultStrategy());
} finally {
jsc.stop();
}
}
}
class ActorSystemCreatorForTest implements Function0<ActorSystem> {
@Override
public ActorSystem call() {
return null;
}
}
class JavaTestActor extends JavaActorReceiver {
@Override
public void onReceive(Object message) throws Exception {
store((String) message);
}
}

View file

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.akka
import akka.actor.{Props, SupervisorStrategy}
import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
class AkkaUtilsSuite extends SparkFunSuite {
test("createStream") {
val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000))
try {
// tests the API, does not actually test data receiving
val test1: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc, Props[TestActor](), "test")
val test2: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2)
val test3: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc,
Props[TestActor](),
"test",
StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy = SupervisorStrategy.defaultStrategy)
val test4: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
val test5: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
val test6: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc,
Props[TestActor](),
"test",
StorageLevel.MEMORY_AND_DISK_SER_2,
() => null,
SupervisorStrategy.defaultStrategy)
} finally {
ssc.stop()
}
}
}
class TestActor extends ActorReceiver {
override def receive: Receive = {
case m: String => store(m)
}
}

View file

@ -41,6 +41,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>

View file

@ -23,7 +23,7 @@ import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.ActorReceiver
import org.apache.spark.streaming.akka.ActorReceiver
/**
* A receiver to subscribe to ZeroMQ stream.

View file

@ -20,29 +20,33 @@ package org.apache.spark.streaming.zeromq
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import akka.actor.{Props, SupervisorStrategy}
import akka.actor.{ActorSystem, Props, SupervisorStrategy}
import akka.util.ByteString
import akka.zeromq.Subscribe
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
object ZeroMQUtils {
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param ssc StreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
* @param ssc StreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
* and each frame has sequence of byte thus it needs the converter
* (which might be deserializer of bytes) to translate from sequence
* of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping (default:
* ActorReceiver.defaultActorSystemCreator)
* @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*/
def createStream[T: ClassTag](
ssc: StreamingContext,
@ -50,22 +54,31 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
): ReceiverInputDStream[T] = {
ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
AkkaUtils.createStream(
ssc,
Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver",
storageLevel,
actorSystemCreator,
supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote ZeroMQ publisher
* @param subscribe Topic to subscribe to
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote ZeroMQ publisher
* @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might be
* deserializer of bytes) to translate from sequence of sequence of bytes,
* where sequence refer to a frame and sub sequence refer to its payload.
* @param storageLevel Storage level to use for storing the received objects
* @param storageLevel Storage level to use for storing the received objects
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping.
* @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*/
def createStream[T](
jssc: JavaStreamingContext,
@ -73,25 +86,33 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel,
actorSystemCreator: JFunction0[ActorSystem],
supervisorStrategy: SupervisorStrategy
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
createStream[T](
jssc.ssc,
publisherUrl,
subscribe,
fn,
storageLevel,
() => actorSystemCreator.call(),
supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might be
* deserializer of bytes) to translate from sequence of sequence of bytes,
* where sequence refer to a frame and sub sequence refer to its payload.
* @param storageLevel RDD storage level.
* @param storageLevel RDD storage level.
*/
def createStream[T](
jssc: JavaStreamingContext,
@ -104,14 +125,19 @@ object ZeroMQUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
createStream[T](
jssc.ssc,
publisherUrl,
subscribe,
fn,
storageLevel)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might
* be deserializer of bytes) to translate from sequence of sequence of
@ -128,6 +154,10 @@ object ZeroMQUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
createStream[T](
jssc.ssc,
publisherUrl,
subscribe,
fn)
}
}

View file

@ -17,14 +17,17 @@
package org.apache.spark.streaming.zeromq;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
import akka.zeromq.Subscribe;
import org.junit.Test;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@ -32,19 +35,29 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
public void testZeroMQStream() {
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@Override
public Iterable<String> call(byte[][] bytes) throws Exception {
return null;
}
};
Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest();
JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects);
JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator,
SupervisorStrategy.defaultStrategy());
}
}
class BytesToObjects implements Function<byte[][], Iterable<String>> {
@Override
public Iterable<String> call(byte[][] bytes) throws Exception {
return null;
}
}
class ActorSystemCreatorForTest implements Function0<ActorSystem> {
@Override
public ActorSystem call() {
return null;
}
}

View file

@ -42,14 +42,22 @@ class ZeroMQStreamSuite extends SparkFunSuite {
// tests the API, does not actually test data receiving
val test1: ReceiverInputDStream[String] =
ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
val test4: ReceiverInputDStream[String] =
ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving
// TODO: Actually test data receiving. A real test needs the native ZeroMQ library
ssc.stop()
}
}

View file

@ -104,6 +104,7 @@
<module>external/flume</module>
<module>external/flume-sink</module>
<module>external/flume-assembly</module>
<module>external/akka</module>
<module>external/mqtt</module>
<module>external/mqtt-assembly</module>
<module>external/zeromq</module>

View file

@ -153,6 +153,16 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
) ++ Seq(
// SPARK-7799 Add "streaming-akka" project
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$6"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$5"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$4"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$3"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream")
) ++ Seq(
// SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"),

View file

@ -35,11 +35,11 @@ object BuildCommons {
private val buildLocation = file(".").getAbsoluteFile.getParentFile
val allProjects@Seq(catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka,
sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingAkka, streamingKafka,
streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags) =
Seq("catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
"sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink",
"streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
"streaming-flume", "streaming-akka", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
"streaming-zeromq", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _))
val optionallyEnabledProjects@Seq(yarn, java8Tests, sparkGangliaLgpl,
@ -232,8 +232,9 @@ object SparkBuild extends PomBuild {
/* Enable tests settings for all projects except examples, assembly and tools */
(allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
// TODO: remove streamingAkka from this list after 2.0.0
allProjects.filterNot(x => Seq(spark, hive, hiveThriftServer, catalyst, repl,
networkCommon, networkShuffle, networkYarn, unsafe, testTags).contains(x)).foreach {
networkCommon, networkShuffle, networkYarn, unsafe, streamingAkka, testTags).contains(x)).foreach {
x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
}
@ -649,7 +650,7 @@ object Unidoc {
"-public",
"-group", "Core Java API", packageList("api.java", "api.java.function"),
"-group", "Spark Streaming", packageList(
"streaming.api.java", "streaming.flume", "streaming.kafka",
"streaming.api.java", "streaming.flume", "streaming.akka", "streaming.kafka",
"streaming.mqtt", "streaming.twitter", "streaming.zeromq", "streaming.kinesis"
),
"-group", "MLlib", packageList(

View file

@ -25,7 +25,6 @@ import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@ -42,7 +41,7 @@ import org.apache.spark.serializer.SerializationDebugger
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
@ -295,27 +294,6 @@ class StreamingContext private[streaming] (
}
}
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of data received and actorStream
* should be same.
*/
def actorStream[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T] = withNamedScope("actor stream") {
receiverStream(new ActorReceiverSupervisor[T](props, name, storageLevel, supervisorStrategy))
}
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited

View file

@ -24,7 +24,6 @@ import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
@ -356,69 +355,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
}
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel Storage level to use for storing the received objects
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of data received and actorStream
* should be same.
*/
def actorStream[T](
props: Props,
name: String,
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
}
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel Storage level to use for storing the received objects
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of data received and actorStream
* should be same.
*/
def actorStream[T](
props: Props,
name: String,
storageLevel: StorageLevel
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel)
}
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param props Props object defining creation of the actor
* @param name Name of the actor
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of data received and actorStream
* should be same.
*/
def actorStream[T](
props: Props,
name: String
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name)
}
/**
* Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.