diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b2e0aff407..af32a63df8 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -116,7 +116,8 @@ object SparkBuild extends Build { "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", "Spray Repository" at "http://repo.spray.cc/", - "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" + "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/", + "Twitter4J Repository" at "http://twitter4j.org/maven2/" ), libraryDependencies ++= Seq( @@ -134,6 +135,8 @@ object SparkBuild extends Build { "com.typesafe.akka" % "akka-slf4j" % "2.0.3", "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", + "org.twitter4j" % "twitter4j-core" % "3.0.2", + "org.twitter4j" % "twitter4j-stream" % "3.0.2", "cc.spray" % "spray-can" % "1.0-M2.1", "cc.spray" % "spray-server" % "1.0-M2.1", "org.apache.mesos" % "mesos" % "0.9.0-incubating" diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala new file mode 100644 index 0000000000..adf1ed15c9 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala @@ -0,0 +1,58 @@ +package spark.streaming + +import spark.RDD +import twitter4j._ +import twitter4j.auth.BasicAuthorization +import collection.mutable.ArrayBuffer +import collection.JavaConversions._ + +/* A stream of Twitter statuses, potentially filtered by one or more keywords. +* +* @constructor create a new Twitter stream using the supplied username and password to authenticate. +* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is +* such that this may return a sampled subset of all tweets during each interval. +*/ +class TwitterInputDStream( + @transient ssc_ : StreamingContext, + username: String, + password: String, + filters: Seq[String] + ) extends InputDStream[Status](ssc_) { + val statuses: ArrayBuffer[Status] = ArrayBuffer() + var twitterStream: TwitterStream = _ + + override def start() = { + twitterStream = new TwitterStreamFactory() + .getInstance(new BasicAuthorization(username, password)) + twitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + statuses += status + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) {} + }) + + val query: FilterQuery = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + twitterStream.filter(query) + } else { + twitterStream.sample() + } + } + + override def stop() = { + twitterStream.shutdown() + } + + override def compute(validTime: Time): Option[RDD[Status]] = { + // Flush the current tweet buffer + val rdd = Some(ssc.sc.parallelize(statuses)) + statuses.foreach(x => statuses -= x) + rdd + } +} diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala new file mode 100644 index 0000000000..19b3cad6ad --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala @@ -0,0 +1,46 @@ +package spark.streaming.examples + +import spark.streaming.StreamingContext._ +import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext} + +object TwitterBasic { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterBasic " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) + val stream = new TwitterInputDStream(ssc, username, password, filters) + ssc.graph.addInputStream(stream) + + val hashTags = stream.flatMap( + status => status.getText.split(" ").filter(_.startsWith("#"))) + + // Word count over hashtags + val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) + // TODO: Sorts on one node - should do with global sorting once streaming supports it + val topCounts = counts.collect().map(_.sortBy(-_._2).take(5)) + + // Print popular hashtags + topCounts.foreachRDD(rdd => { + if (rdd.count() != 0) { + val topList = rdd.take(1)(0) + println("\nPopular topics in last 60 seconds:") + topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))} + } + }) + + // Print number of tweets in the window + stream.window(Seconds(60)).count().foreachRDD(rdd => + if (rdd.count() != 0) { + println("Window size: %s tweets".format(rdd.take(1)(0))) + } + ) + ssc.start() + } +}