-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConsumer
More file actions
62 lines (48 loc) · 1.94 KB
/
Copy pathConsumer
File metadata and controls
62 lines (48 loc) · 1.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
KB
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from kafka import KafkaConsumer
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk import tokenize
from datetime import datetime
from elasticsearch import Elasticsearch
from random import randint
esearch = Elasticsearch()
latlist = [{"lat" : 41.9205648,"lon" : -95.6334317},{"lat" :38.4247884,"lon" : -121.5157185},{"lat" : 32.6008611,"lon":-88.9249393},{"lat" : 44.123874,"lon" : -122.7816025}]
def es_object(rdd):
for x in rdd.collect():
obj = {}
obj['text'] = x
senala = sid.polarity_scores(x)['compound']
if senala > 0:
obj['positive']=1
elif senala < 0:
obj['negative']=1
else:
obj['neutral']=1
obj['timestamp'] = datetime.now()
obj['location'] = latlist[randint(0,3)]
esearch.index(index="twitterstream", doc_type="test-type", body=obj)
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum = "localhost:2181"
topic = "twitterstream"
sid = SentimentIntensityAnalyzer()
consumer = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = consumer.map(lambda x: x[1].encode("ascii","ignore"))
#counts = lines.map(lambda line : sid.polarity_scores(str(line))['compound'])
#sentiments = counts.map(lambda x : "neagative" if x < 0 else "positive" if x > 0 else "neutral" )
lines.foreachRDD(es_object)
ssc.start()
ssc.awaitTermination()
"""#consumer = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
consumer.subscribe(['twitterstream'])
for message in consumer:
print ("----------",message.value)
sid = SentimentIntensityAnalyzer()
ss = sid.polarity_scores(str(message))
print(ss['compound'])"""