[SPARK-9057][STREAMING] Twitter example joining to static RDD of word sentiment values

Example of joining a static RDD of word sentiments to a streaming RDD of Tweets in order to demo the usage of the transform() method.

Author: Jeff L <sha0lin@alumni.carnegiemellon.edu>

Closes #8431 from Agent007/SPARK-9057.
This commit is contained in:
Jeff L 2015-12-18 15:06:54 +00:00 committed by Sean Owen
parent 2bebaa39d9
commit ea59b0f3a6
4 changed files with 2830 additions and 0 deletions

2477
data/streaming/AFINN-111.txt Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.streaming;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.TwitterUtils;
import scala.Tuple2;
import twitter4j.Status;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
/**
* Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
* the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
*/
public class JavaTwitterHashTagJoinSentiments {
public static void main(String[] args) throws IOException {
if (args.length < 4) {
System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
" <access token> <access token secret> [<filters>]");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
String consumerKey = args[0];
String consumerSecret = args[1];
String accessToken = args[2];
String accessTokenSecret = args[3];
String[] filters = Arrays.copyOfRange(args, 4, args.length);
// Set the system properties so that Twitter4j library used by Twitter stream
// can use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
System.setProperty("twitter4j.oauth.accessToken", accessToken);
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret);
SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters);
JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
@Override
public Iterable<String> call(Status s) {
return Arrays.asList(s.getText().split(" "));
}
});
JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String word) throws Exception {
return word.startsWith("#");
}
});
// Read in the word-sentiment list and create a static RDD from it
String wordSentimentFilePath = "data/streaming/AFINN-111.txt";
final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath)
.mapToPair(new PairFunction<String, String, Double>(){
@Override
public Tuple2<String, Double> call(String line) {
String[] columns = line.split("\t");
return new Tuple2<String, Double>(columns[0],
Double.parseDouble(columns[1]));
}
});
JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
// leave out the # character
return new Tuple2<String, Integer>(s.substring(1), 1);
}
});
JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) {
return a + b;
}
}, new Duration(10000));
// Determine the hash tags with the highest sentiment values by joining the streaming RDD
// with the static RDD inside the transform() method and then multiplying
// the frequency of the hash tag by its sentiment value
JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples =
hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
JavaPairRDD<String, Tuple2<Double, Integer>>>() {
@Override
public JavaPairRDD<String, Tuple2<Double, Integer>> call(JavaPairRDD<String,
Integer> topicCount)
throws Exception {
return wordSentiments.join(topicCount);
}
});
JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair(
new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String,
Tuple2<Double, Integer>> topicAndTuplePair) throws Exception {
Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
return new Tuple2<String, Double>(topicAndTuplePair._1(),
happinessAndCount._1() * happinessAndCount._2());
}
});
JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
new PairFunction<Tuple2<String, Double>, Double, String>() {
@Override
public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness)
throws Exception {
return new Tuple2<Double, String>(topicHappiness._2(),
topicHappiness._1());
}
});
JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
@Override
public JavaPairRDD<Double, String> call(JavaPairRDD<Double,
String> happinessAndTopics) throws Exception {
return happinessAndTopics.sortByKey(false);
}
}
);
// Print hash tags with the most positive sentiment values
happiest10.foreachRDD(new Function<JavaPairRDD<Double, String>, Void>() {
@Override
public Void call(JavaPairRDD<Double, String> happinessTopicPairs) throws Exception {
List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
System.out.println(
String.format("\nHappiest topics in last 10 seconds (%s total):",
happinessTopicPairs.count()));
for (Tuple2<Double, String> pair : topList) {
System.out.println(
String.format("%s (%s happiness)", pair._2(), pair._1()));
}
return null;
}
});
jssc.start();
jssc.awaitTermination();
}
}

View file

@ -0,0 +1,77 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Shows the most positive words in UTF8 encoded, '\n' delimited text directly received the network
every 5 seconds. The streaming data is joined with a static RDD of the AFINN word list
(http://neuro.imm.dtu.dk/wiki/AFINN)
Usage: network_wordjoinsentiments.py <hostname> <port>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordjoinsentiments.py \
localhost 9999`
"""
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def print_happiest_words(rdd):
top_list = rdd.take(5)
print("Happiest topics in the last 5 seconds (%d total):" % rdd.count())
for tuple in top_list:
print("%s (%d happiness)" % (tuple[1], tuple[0]))
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordjoinsentiments.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordJoinSentiments")
ssc = StreamingContext(sc, 5)
# Read in the word-sentiment list and create a static RDD from it
word_sentiments_file_path = "data/streaming/AFINN-111.txt"
word_sentiments = ssc.sparkContext.textFile(word_sentiments_file_path) \
.map(lambda line: tuple(line.split("\t")))
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
word_counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
# Determine the words with the highest sentiment values by joining the streaming RDD
# with the static RDD inside the transform() method and then multiplying
# the frequency of the words by its sentiment value
happiest_words = word_counts.transform(lambda rdd: word_sentiments.join(rdd)) \
.map(lambda (word, tuple): (word, float(tuple[0]) * tuple[1])) \
.map(lambda (word, happiness): (happiness, word)) \
.transform(lambda rdd: rdd.sortByKey(False))
happiest_words.foreachRDD(print_happiest_words)
ssc.start()
ssc.awaitTermination()

View file

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
* the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
*/
object TwitterHashTagJoinSentiments {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: TwitterHashTagJoinSentiments <consumer key> <consumer secret> " +
"<access token> <access token secret> [<filters>]")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
// Set the system properties so that Twitter4j library used by Twitter stream
// can use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
// Read in the word-sentiment list and create a static RDD from it
val wordSentimentFilePath = "data/streaming/AFINN-111.txt"
val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
val Array(word, happinessValue) = line.split("\t")
(word, happinessValue)
} cache()
// Determine the hash tags with the highest sentiment values by joining the streaming RDD
// with the static RDD inside the transform() method and then multiplying
// the frequency of the hash tag by its sentiment value
val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1))
.reduceByKeyAndWindow(_ + _, Seconds(60))
.transform{topicCount => wordSentiments.join(topicCount)}
.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
.map{case (topic, happinessValue) => (happinessValue, topic)}
.transform(_.sortByKey(false))
val happiest10 = hashTags.map(hashTag => (hashTag.tail, 1))
.reduceByKeyAndWindow(_ + _, Seconds(10))
.transform{topicCount => wordSentiments.join(topicCount)}
.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
.map{case (topic, happinessValue) => (happinessValue, topic)}
.transform(_.sortByKey(false))
// Print hash tags with the most positive sentiment values
happiest60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
})
happiest10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
})
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println