Adding a Twitter InputDStream with an example

This commit is contained in:
Patrick Wendell 2012-12-21 17:11:39 -08:00
parent 556c38ed91
commit 9ac4cb1c5f
3 changed files with 100 additions and 1 deletions

View file

@ -115,7 +115,8 @@ object SparkBuild extends Build {
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
"Twitter4J Repository" at "http://twitter4j.org/maven2/"
),
libraryDependencies ++= Seq(
@ -133,6 +134,8 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-slf4j" % "2.0.3",
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"org.twitter4j" % "twitter4j-core" % "3.0.2",
"org.twitter4j" % "twitter4j-stream" % "3.0.2",
"cc.spray" % "spray-can" % "1.0-M2.1",
"cc.spray" % "spray-server" % "1.0-M2.1",
"org.apache.mesos" % "mesos" % "0.9.0-incubating"

View file

@ -0,0 +1,59 @@
package spark.streaming
import spark.RDD
import spark.streaming.{Time, InputDStream}
import twitter4j._
import twitter4j.auth.BasicAuthorization
import collection.mutable.ArrayBuffer
import collection.JavaConversions._
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
* @constructor create a new Twitter stream using the supplied username and password to authenticate.
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
*/
class TwitterInputDStream(
@transient ssc_ : StreamingContext,
username: String,
password: String,
filters: Seq[String]
) extends InputDStream[Status](ssc_) {
val statuses: ArrayBuffer[Status] = ArrayBuffer()
var twitterStream: TwitterStream = _
override def start() = {
twitterStream = new TwitterStreamFactory()
.getInstance(new BasicAuthorization(username, password))
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
statuses += status
}
// Unimplemented
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) {}
})
val query: FilterQuery = new FilterQuery
if (filters.size > 0) {
query.track(filters.toArray)
twitterStream.filter(query)
} else {
twitterStream.sample()
}
}
override def stop() = {
twitterStream.shutdown()
}
override def compute(validTime: Time): Option[RDD[Status]] = {
// Flush the current tweet buffer
val rdd = Some(ssc.sc.parallelize(statuses))
statuses.foreach(x => statuses -= x)
rdd
}
}

View file

@ -0,0 +1,37 @@
package spark.streaming.examples
import spark.streaming.StreamingContext._
import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext}
object TwitterBasic {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>")
System.exit(1)
}
val Array(master, username, password) = args
val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
val stream = new TwitterInputDStream(ssc, username, password, Seq())
ssc.graph.addInputStream(stream)
val hashTags = stream.flatMap(
status => status.getText.split(" ").filter(_.startsWith("#")))
// Word count over hashtags
val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
// TODO: Sorts on one node - should do with global sorting once streaming supports it
val topCounts = counts.collect().map(_.sortBy(-_._2).take(5))
topCounts.foreachRDD(rdd => {
val topList = rdd.take(1)(0)
println("\nPopular topics in last 60 seconds:")
topList.foreach(t => println("%s (%s tweets)".format(t._1, t._2)))
}
)
ssc.start()
}
}