forked from Soben713/Twizhoosh
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwitter_related_scripts_runner.py
123 lines (97 loc) · 4.33 KB
/
twitter_related_scripts_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
from twython import *
from twython.exceptions import TwythonError, TwythonRateLimitError
from core.utils.sample_twitter_responses import sample_tweet, sample_direct_message
from core.utils.singleton import Singleton
from core.utils.logging import log
from core import settings, script_loader
class ParseStreamingData():
@staticmethod
def get_type_of_data(data):
keys = list(data.keys())
if len(keys) == 1:
# Many streaming api data like messages, friends etc have only one key
return keys[0]
if 'event' in data:
# See list of events here:
# https://dev.twitter.com/streaming/overview/messages-types#user_stream_messsages
return data['event']
if 'text' in data and 'favorite_count' in data:
if data['user']['screen_name'] == settings.TWIZHOOSH_USERNAME:
return 'self_status_update'
return 'timeline_update'
# Add any other types that I forgot (didn't care about)
return None
class StreamingSingleton(TwythonStreamer, metaclass=Singleton):
# dict of events to list of subscribing script instances
scripts = {}
def __init__(self, *args, **kwargs):
super(StreamingSingleton, self).__init__(
settings.CONSUMER_KEY, settings.CONSUMER_SECRET, settings.OAUTH_TOKEN, settings.OAUTH_TOKEN_SECRET,
*args, **kwargs
)
self.load_scripts()
def load_scripts(self):
"""
Loads scripts to self.scripts
"""
script_classes = script_loader.load_scripts('scripts.twitter_related',
settings.INSTALLED_TWITTER_RELATED_SCRIPTS)
log("Loading twitter related scripts")
for script in script_classes:
listen_to = script.listen_to if isinstance(script.listen_to, list) else [script.listen_to]
for event in listen_to:
self.scripts.setdefault(event, []).append(script())
log("Loaded {0} for type {1}".format(script.__name__, event))
def on_success(self, data):
data_type = ParseStreamingData.get_type_of_data(data)
if not data_type:
log("Type of data unknown \n {0}".format(data))
return
log("Data received, with type: " + data_type)
log("Data: {0}".format(data))
for script in self.scripts.setdefault(data_type, []):
log("Data type: {1} script found: {0}".format(script.__class__.__name__, data_type))
try:
getattr(script, 'on_' + data_type)(data)
except TwythonError as e:
log("Twython error when occured when running script {0}\nError is:{1}".format(
script.__class__.__name__, e))
def on_error(self, status_code, data):
log("Error streaming [{0}]: {1}".format(status_code, data))
def user(self, *args, **kwargs):
while True:
try:
super(StreamingSingleton, self).user(self, *args, **kwargs)
except TwythonRateLimitError as e:
log("Rate limit error, asks to retry after {0}".format(e.retry_after))
time.sleep(min(int(e.retry_after), 5))
except TwythonError as e:
log("Twython error {0}".format(e))
class Testing(StreamingSingleton):
def __init__(self, *args, **kwargs):
self.load_scripts()
def user(self, *args, **kwargs):
while True:
input_type = input("Enter a data type number \n 1. Timeline \n 2. Direct Message \n 3. Custom data \n")
if input_type == '1':
timeline_text = input("Enter a tweet\n")
data = sample_tweet
data['text'] = timeline_text
elif input_type == '2':
message_text = input("Enter a direct message to @" + settings.TWIZHOOSH_USERNAME + "\n")
data = sample_direct_message
data['direct_message']['text'] = message_text
else:
data = eval(input("Enter your data object \n"))
self.on_success(data)
def run():
if not settings.DEBUG:
log("Starting streamer...")
stream = StreamingSingleton()
stream.user(replies="all")
else:
stream = Testing()
stream.user(replies="all")