-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathfollowing_monitor.py
97 lines (82 loc) · 4.8 KB
/
following_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import time
from typing import List, Union, Tuple, Dict
from monitor_base import MonitorBase
from utils import find_all, find_one, get_cursor, get_content
class FollowingMonitor(MonitorBase):
monitor_type = 'Following'
def __init__(self, username: str, token_config: dict, user_config: dict, cookies_dir: str):
super().__init__(monitor_type=self.monitor_type,
username=username,
token_config=token_config,
user_config=user_config,
cookies_dir=cookies_dir)
self.following_dict = self.get_all_following(self.user_id)
self.logger.info('Init following monitor succeed.\nUser id: {}\nFollowing {} users: {}'.format(
self.user_id, len(self.following_dict),
[find_one(following, 'screen_name') for following in self.following_dict.values()]))
def get_all_following(self, user_id: int) -> Dict[str, dict]:
api_name = 'Following'
params = {'userId': user_id, 'includePromotedContent': True, 'count': 1000}
following_dict = dict()
while True:
json_response = self.twitter_watcher.query(api_name, params)
following_list = find_all(json_response, 'user_results')
while not following_list and not find_one(json_response, 'result'):
import json
self.logger.error(json.dumps(json_response, indent=2))
time.sleep(10)
json_response = self.twitter_watcher.query(api_name, params)
following_list = find_all(json_response, 'user_results')
for following in following_list:
user_id = find_one(following, 'rest_id')
following_dict[user_id] = following
cursor = get_cursor(json_response)
if not cursor or cursor.startswith('-1|') or cursor.startswith('0|'):
break
params['cursor'] = cursor
return following_dict
def parse_user_details(self, user: int) -> Tuple[str, Union[str, None]]:
content = get_content(user)
details_str = 'Name: {}'.format(content.get('name', ''))
details_str += '\nBio: {}'.format(content.get('description', ''))
details_str += '\nWebsite: {}'.format(
content.get('entities', {}).get('url', {}).get('urls', [{}])[0].get('expanded_url', ''))
details_str += '\nJoined at: {}'.format(content.get('created_at', ''))
details_str += '\nFollowing: {}'.format(content.get('friends_count', -1))
details_str += '\nFollowers: {}'.format(content.get('followers_count', -1))
details_str += '\nTweets: {}'.format(content.get('statuses_count', -1))
return details_str, content.get('profile_image_url_https', '').replace('_normal', '')
def detect_changes(self, old_following_dict: set, new_following_dict: set) -> bool:
if old_following_dict.keys() == new_following_dict.keys():
return True
max_changes = max(len(old_following_dict) / 2, 10)
dec_user_id_list = old_following_dict.keys() - new_following_dict.keys()
inc_user_id_list = new_following_dict.keys() - old_following_dict.keys()
if len(dec_user_id_list) > max_changes or len(inc_user_id_list) > max_changes:
return False
if dec_user_id_list:
self.logger.info('Unfollow: {}'.format(dec_user_id_list))
for dec_user_id in dec_user_id_list:
message = 'Unfollow: @{}'.format(find_one(old_following_dict[dec_user_id], 'screen_name'))
details_str, profile_image_url = self.parse_user_details(old_following_dict[dec_user_id])
if details_str:
message += '\n{}'.format(details_str)
self.send_message(message=message, photo_url_list=[profile_image_url] if profile_image_url else [])
if inc_user_id_list:
self.logger.info('Follow: {}'.format(inc_user_id_list))
for inc_user_id in inc_user_id_list:
message = 'Follow: @{}'.format(find_one(new_following_dict[inc_user_id], 'screen_name'))
details_str, profile_image_url = self.parse_user_details(new_following_dict[inc_user_id])
if details_str:
message += '\n{}'.format(details_str)
self.send_message(message=message, photo_url_list=[profile_image_url] if profile_image_url else [])
return True
def watch(self) -> bool:
following_dict = self.get_all_following(self.user_id)
if not self.detect_changes(self.following_dict, following_dict):
return False
self.following_dict = following_dict
self.update_last_watch_time()
return True
def status(self) -> str:
return 'Last: {}, number: {}'.format(self.get_last_watch_time(), len(self.following_dict))