Merge pull request #8 from radlab/twitter-example

Adding a Twitter InputDStream with an example
This commit is contained in:
Patrick Wendell 2012-12-29 14:23:01 -08:00
commit 518111573f
3 changed files with 108 additions and 1 deletions

View file

@ -116,7 +116,8 @@ object SparkBuild extends Build {
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
"Twitter4J Repository" at "http://twitter4j.org/maven2/"
),
libraryDependencies ++= Seq(
@ -134,6 +135,8 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-slf4j" % "2.0.3",
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"org.twitter4j" % "twitter4j-core" % "3.0.2",
"org.twitter4j" % "twitter4j-stream" % "3.0.2",
"cc.spray" % "spray-can" % "1.0-M2.1",
"cc.spray" % "spray-server" % "1.0-M2.1",
"org.apache.mesos" % "mesos" % "0.9.0-incubating"

View file

@ -0,0 +1,58 @@
package spark.streaming
import spark.RDD
import twitter4j._
import twitter4j.auth.BasicAuthorization
import collection.mutable.ArrayBuffer
import collection.JavaConversions._
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
* @constructor create a new Twitter stream using the supplied username and password to authenticate.
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
*/
class TwitterInputDStream(
@transient ssc_ : StreamingContext,
username: String,
password: String,
filters: Seq[String]
) extends InputDStream[Status](ssc_) {
val statuses: ArrayBuffer[Status] = ArrayBuffer()
var twitterStream: TwitterStream = _
override def start() = {
twitterStream = new TwitterStreamFactory()
.getInstance(new BasicAuthorization(username, password))
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
statuses += status
}
// Unimplemented
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) {}
})
val query: FilterQuery = new FilterQuery
if (filters.size > 0) {
query.track(filters.toArray)
twitterStream.filter(query)
} else {
twitterStream.sample()
}
}
override def stop() = {
twitterStream.shutdown()
}
override def compute(validTime: Time): Option[RDD[Status]] = {
// Flush the current tweet buffer
val rdd = Some(ssc.sc.parallelize(statuses))
statuses.foreach(x => statuses -= x)
rdd
}
}

View file

@ -0,0 +1,46 @@
package spark.streaming.examples
import spark.streaming.StreamingContext._
import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext}
object TwitterBasic {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
val stream = new TwitterInputDStream(ssc, username, password, filters)
ssc.graph.addInputStream(stream)
val hashTags = stream.flatMap(
status => status.getText.split(" ").filter(_.startsWith("#")))
// Word count over hashtags
val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
// TODO: Sorts on one node - should do with global sorting once streaming supports it
val topCounts = counts.collect().map(_.sortBy(-_._2).take(5))
// Print popular hashtags
topCounts.foreachRDD(rdd => {
if (rdd.count() != 0) {
val topList = rdd.take(1)(0)
println("\nPopular topics in last 60 seconds:")
topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))}
}
})
// Print number of tweets in the window
stream.window(Seconds(60)).count().foreachRDD(rdd =>
if (rdd.count() != 0) {
println("Window size: %s tweets".format(rdd.take(1)(0)))
}
)
ssc.start()
}
}