Merge pull request #480 from MLnick/streaming-eg-algebird
[Streaming] Examples using Twitter's Algebird library
This commit is contained in:
commit
cfa65ebff1
|
@ -20,11 +20,10 @@
|
|||
<artifactId>jetty-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.twitter4j</groupId>
|
||||
<artifactId>twitter4j-stream</artifactId>
|
||||
<version>3.0.3</version>
|
||||
<groupId>com.twitter</groupId>
|
||||
<artifactId>algebird-core_2.9.2</artifactId>
|
||||
<version>0.1.9</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.version}</artifactId>
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
package spark.streaming.examples
|
||||
|
||||
import spark.streaming.{Seconds, StreamingContext}
|
||||
import spark.storage.StorageLevel
|
||||
import com.twitter.algebird._
|
||||
import spark.streaming.StreamingContext._
|
||||
import spark.SparkContext._
|
||||
|
||||
/**
|
||||
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
|
||||
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
|
||||
* <br>
|
||||
* <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
|
||||
* the example operates on Long IDs. Once the implementation supports other inputs (such as String),
|
||||
* the same approach could be used for computing popular topics for example.
|
||||
* <p>
|
||||
* <p>
|
||||
* <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
|
||||
* This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
|
||||
* for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
|
||||
* that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
|
||||
* estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
|
||||
* percentage of the overall total count.
|
||||
* <p><p>
|
||||
* Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
|
||||
*/
|
||||
object TwitterAlgebirdCMS {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length < 3) {
|
||||
System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" +
|
||||
" [filter1] [filter2] ... [filter n]")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
// CMS parameters
|
||||
val DELTA = 1E-3
|
||||
val EPS = 0.01
|
||||
val SEED = 1
|
||||
val PERC = 0.001
|
||||
// K highest frequency elements to take
|
||||
val TOPK = 10
|
||||
|
||||
val Array(master, username, password) = args.slice(0, 3)
|
||||
val filters = args.slice(3, args.length)
|
||||
|
||||
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
|
||||
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
|
||||
|
||||
val users = stream.map(status => status.getUser.getId)
|
||||
|
||||
val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
|
||||
var globalCMS = cms.zero
|
||||
val mm = new MapMonoid[Long, Int]()
|
||||
var globalExact = Map[Long, Int]()
|
||||
|
||||
val approxTopUsers = users.mapPartitions(ids => {
|
||||
ids.map(id => cms.create(id))
|
||||
}).reduce(_ ++ _)
|
||||
|
||||
val exactTopUsers = users.map(id => (id, 1))
|
||||
.reduceByKey((a, b) => a + b)
|
||||
|
||||
approxTopUsers.foreach(rdd => {
|
||||
if (rdd.count() != 0) {
|
||||
val partial = rdd.first()
|
||||
val partialTopK = partial.heavyHitters.map(id =>
|
||||
(id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
|
||||
globalCMS ++= partial
|
||||
val globalTopK = globalCMS.heavyHitters.map(id =>
|
||||
(id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
|
||||
println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
|
||||
partialTopK.mkString("[", ",", "]")))
|
||||
println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
|
||||
globalTopK.mkString("[", ",", "]")))
|
||||
}
|
||||
})
|
||||
|
||||
exactTopUsers.foreach(rdd => {
|
||||
if (rdd.count() != 0) {
|
||||
val partialMap = rdd.collect().toMap
|
||||
val partialTopK = rdd.map(
|
||||
{case (id, count) => (count, id)})
|
||||
.sortByKey(ascending = false).take(TOPK)
|
||||
globalExact = mm.plus(globalExact.toMap, partialMap)
|
||||
val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
|
||||
println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
|
||||
println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
|
||||
}
|
||||
})
|
||||
|
||||
ssc.start()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package spark.streaming.examples
|
||||
|
||||
import spark.streaming.{Seconds, StreamingContext}
|
||||
import spark.storage.StorageLevel
|
||||
import com.twitter.algebird.HyperLogLog._
|
||||
import com.twitter.algebird.HyperLogLogMonoid
|
||||
import spark.streaming.dstream.TwitterInputDStream
|
||||
|
||||
/**
|
||||
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
|
||||
* a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
|
||||
* <p>
|
||||
* <p>
|
||||
* This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
|
||||
* blog post</a> and this
|
||||
* <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a>
|
||||
* have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating
|
||||
* the cardinality of a data stream, i.e. the number of unique elements.
|
||||
* <p><p>
|
||||
* Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation.
|
||||
*/
|
||||
object TwitterAlgebirdHLL {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length < 3) {
|
||||
System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" +
|
||||
" [filter1] [filter2] ... [filter n]")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
/** Bit size parameter for HyperLogLog, trades off accuracy vs size */
|
||||
val BIT_SIZE = 12
|
||||
val Array(master, username, password) = args.slice(0, 3)
|
||||
val filters = args.slice(3, args.length)
|
||||
|
||||
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
|
||||
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
|
||||
|
||||
val users = stream.map(status => status.getUser.getId)
|
||||
|
||||
val hll = new HyperLogLogMonoid(BIT_SIZE)
|
||||
var globalHll = hll.zero
|
||||
var userSet: Set[Long] = Set()
|
||||
|
||||
val approxUsers = users.mapPartitions(ids => {
|
||||
ids.map(id => hll(id))
|
||||
}).reduce(_ + _)
|
||||
|
||||
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
|
||||
|
||||
approxUsers.foreach(rdd => {
|
||||
if (rdd.count() != 0) {
|
||||
val partial = rdd.first()
|
||||
globalHll += partial
|
||||
println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
|
||||
println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
|
||||
}
|
||||
})
|
||||
|
||||
exactUsers.foreach(rdd => {
|
||||
if (rdd.count() != 0) {
|
||||
val partial = rdd.first()
|
||||
userSet ++= partial
|
||||
println("Exact distinct users this batch: %d".format(partial.size))
|
||||
println("Exact distinct users overall: %d".format(userSet.size))
|
||||
println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
|
||||
}
|
||||
})
|
||||
|
||||
ssc.start()
|
||||
}
|
||||
}
|
|
@ -154,7 +154,8 @@ object SparkBuild extends Build {
|
|||
)
|
||||
|
||||
def examplesSettings = sharedSettings ++ Seq(
|
||||
name := "spark-examples"
|
||||
name := "spark-examples",
|
||||
libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.9")
|
||||
)
|
||||
|
||||
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
|
||||
|
|
|
@ -47,6 +47,16 @@
|
|||
<artifactId>zkclient</artifactId>
|
||||
<version>0.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.twitter4j</groupId>
|
||||
<artifactId>twitter4j-stream</artifactId>
|
||||
<version>3.0.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.twitter4j</groupId>
|
||||
<artifactId>twitter4j-core</artifactId>
|
||||
<version>3.0.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
|
|
Loading…
Reference in a new issue