fixes corresponding to review feedback at pull request #479

This commit is contained in:
Prashant Sharma 2013-02-20 12:33:37 +05:30
parent 8d44480d84
commit 4e5b09664c
3 changed files with 9 additions and 6 deletions

View file

@ -35,20 +35,23 @@ object SimpleZeroMQPublisher {
/** /**
* A sample wordcount with ZeroMQStream stream * A sample wordcount with ZeroMQStream stream
* *
* Usage: WordCountZeroMQ <master> <zeroMQurl> <topic> * To work with zeroMQ, some native libraries have to be installed.
* Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
*
* Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
* In local mode, <master> should be 'local[n]' with n > 1 * In local mode, <master> should be 'local[n]' with n > 1
* <zeroMQurl> and <topic> describe where zeroMq publisher is running. * <zeroMQurl> and <topic> describe where zeroMq publisher is running.
* *
* To run this example locally, you may run publisher as * To run this example locally, you may run publisher as
* `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` * `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* and then run the example * and run the example as
* `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` * `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
*/ */
object ZeroMQWordCount { object ZeroMQWordCount {
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length < 3) { if (args.length < 3) {
System.err.println( System.err.println(
"Usage: WordCountZeroMQ <master> <zeroMQurl> <topic>" + "Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>" +
"In local mode, <master> should be 'local[n]' with n > 1") "In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1) System.exit(1)
} }

View file

@ -134,7 +134,6 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-actor" % "2.0.3", "com.typesafe.akka" % "akka-actor" % "2.0.3",
"com.typesafe.akka" % "akka-remote" % "2.0.3", "com.typesafe.akka" % "akka-remote" % "2.0.3",
"com.typesafe.akka" % "akka-slf4j" % "2.0.3", "com.typesafe.akka" % "akka-slf4j" % "2.0.3",
"com.typesafe.akka" % "akka-zeromq" % "2.0.3",
"it.unimi.dsi" % "fastutil" % "6.4.4", "it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0", "colt" % "colt" % "1.2.0",
"cc.spray" % "spray-can" % "1.0-M2.1", "cc.spray" % "spray-can" % "1.0-M2.1",
@ -165,7 +164,8 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
"com.github.sgroschupf" % "zkclient" % "0.1", "com.github.sgroschupf" % "zkclient" % "0.1",
"org.twitter4j" % "twitter4j-stream" % "3.0.3" "org.twitter4j" % "twitter4j-stream" % "3.0.3",
"com.typesafe.akka" % "akka-zeromq" % "2.0.3"
) )
) ++ assemblySettings ++ extraAssemblySettings ) ++ assemblySettings ++ extraAssemblySettings

View file

@ -177,7 +177,7 @@ class StreamingContext private (
} }
/** /**
* ZeroMQ stream receiver * Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher * @param publisherUrl Url of remote zeromq publisher
* @param zeroMQ topic to subscribe to * @param zeroMQ topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence