forked from LizardByte/ThemerrDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupdater.py
449 lines (365 loc) · 15 KB
/
updater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# standard imports
import argparse
from datetime import datetime
import json
from operator import itemgetter
import os
from queue import Queue
import re
import threading
import time
from typing import Callable, Optional
# lib imports
from igdb.wrapper import IGDBWrapper
import requests
import youtube_dl
# load env
from dotenv import load_dotenv
load_dotenv()
# databases
databases = dict(
igdb=dict(
all_items=[],
path=os.path.join('database', 'games', 'igdb'),
type='game',
),
tmdb=dict(
all_items=[],
path=os.path.join('database', 'movies', 'themoviedb'),
type='movie',
)
)
imdb_path = os.path.join('database', 'movies', 'imdb')
# setup queue
queue = Queue()
def igdb_authorization(client_id: str, client_secret: str) -> dict:
"""
Get the igdb authorization.
Parameters
----------
client_id : str
Twitch developer client id.
client_secret : str
Twitch developer client secret.
Returns
-------
dict
Dictionary containing access token and expiration.
"""
auth_headers = dict(
Accept='application/json',
client_id=client_id,
client_secret=client_secret,
grant_type='client_credentials'
)
token_url = 'https://id.twitch.tv/oauth2/token'
authorization = requests.post(url=token_url, data=auth_headers).json()
return authorization
# setup igdb authorization and wrapper
auth = igdb_authorization(
client_id=os.environ["TWITCH_CLIENT_ID"],
client_secret=os.environ["TWITCH_CLIENT_SECRET"]
)
wrapper = IGDBWrapper(client_id=os.environ["TWITCH_CLIENT_ID"], auth_token=auth['access_token'])
def requests_loop(url: str, method: Callable = requests.get, max_tries: int = 10) -> requests.Response:
count = 0
while count <= max_tries:
try:
response = method(url=url)
if response.status_code == requests.codes.ok:
return response
except requests.exceptions.RequestException:
time.sleep(2**count)
count += 1
def process_queue() -> None:
"""
Add items to the queue.
This is an endless loop to add items to the queue.
Examples
--------
>>> threads = threading.Thread(target=process_queue, daemon=True)
...
"""
while True:
item = queue.get()
queue_handler(item=item) # process the item from the queue
queue.task_done() # tells the queue that we are done with this item
def queue_handler(item: tuple) -> None:
data = process_item_id(item_type=item[0], item_id=item[1])
if item[0] == 'game':
databases['igdb']['all_items'].append(dict(
id=data['id'],
title=data['name'] # igdb refers to this as name, but let's use title for consistency
))
elif item[0] == 'movie':
databases['tmdb']['all_items'].append(dict(
id=data['id'],
title=data['title']
))
# create multiple threads for processing themes faster
# number of threads
for t in range(10):
try:
# for each thread, start it
t = threading.Thread(target=process_queue)
# when we set daemon to true, that thread will end when the main thread ends
t.daemon = True
# start the daemon thread
t.start()
except RuntimeError as r_e:
print(f'RuntimeError encountered: {r_e}')
break
def process_item_id(item_type: str,
item_id: Optional[int] = None,
game_slug: Optional[str] = None,
youtube_url: Optional[str] = None) -> dict:
destination_filenames = []
# empty dictionary to handle future cases
og_data = dict()
json_data = dict()
database_path = None
if item_type == 'game':
database_path = databases['igdb']['path']
if game_slug:
where_type = 'slug'
where = f'"{game_slug}"'
elif item_id:
where_type = 'id'
where = item_id
else:
raise Exception('game_slug or game_id is required')
print(f'Searching igdb for {where_type}: {where}')
fields = [
'cover.url',
'name',
'release_dates.y',
'slug',
'summary',
'url'
]
limit = 1
offset = 0
byte_array = wrapper.api_request(
endpoint='games',
query=f'fields {", ".join(fields)}; where {where_type} = ({where}); limit {limit}; offset {offset};'
)
json_result = json.loads(byte_array) # this is a list of dictionaries
try:
item_id = json_result[0]['id']
except (KeyError, IndexError) as e:
raise Exception(f'Error getting game id: {e}')
else:
json_data = json_result[0]
elif item_type == 'movie':
database_path = databases['tmdb']['path']
# get the data from tmdb api
url = f'https://api.themoviedb.org/3/movie/{item_id}?api_key={os.environ["TMDB_API_KEY_V3"]}'
response = requests_loop(url=url, method=requests.get)
if response.status_code != 200:
raise Exception(f'tmdb api returned a non 200 status code of: {response.status_code}')
json_data = response.json()
item_file = os.path.join(database_path, f"{item_id}.json")
if os.path.isfile(item_file):
with open(file=item_file, mode='r') as og_f:
og_data = json.load(fp=og_f) # get currently saved data
print(f'processing {item_type} id {item_id}')
try:
json_data['id']
except KeyError as e:
raise Exception(f'Error processing game: {e}')
else:
try:
args.issue_update
except NameError:
pass
else:
if args.issue_update:
# create the issue comment and title files
poster = ''
if item_type == 'game':
issue_title = f"[GAME]: {json_data['name']} ({json_data['release_dates'][0]['y']})"
poster = f"https:{json_data['cover']['url'].replace('/t_thumb/', '/t_cover_big/')}"
elif item_type == 'movie':
issue_title = f"[MOVIE]: {json_data['title']} ({json_data['release_date'][0:4]})"
poster = f"https://image.tmdb.org/t/p/w185{json_data['poster_path']}"
issue_comment = f"""
| Property | Value |
| --- | --- |
| title | {json_data['name'] if item_type == 'game' else json_data['title']} |
| year | {json_data['release_dates'][0]['y'] if item_type == 'game' else json_data['release_date'][0:4]} |
| summary | {json_data['summary'] if item_type == 'game' else json_data['overview']} |
| id | {json_data['id']} |
| poster | ![poster]({poster}) |
"""
with open("comment.md", "a") as comment_f:
comment_f.write(issue_comment)
with open("title.md", "w") as title_f:
title_f.write(issue_title)
# update dates
now = int(datetime.utcnow().timestamp())
try:
og_data['youtube_theme_added']
except KeyError:
og_data['youtube_theme_added'] = now
finally:
og_data['youtube_theme_edited'] = now
# update user ids
original_submission = False
try:
og_data['youtube_theme_added_by']
except KeyError:
original_submission = True
og_data['youtube_theme_added_by'] = os.environ['ISSUE_AUTHOR_USER_ID']
finally:
og_data['youtube_theme_edited_by'] = os.environ['ISSUE_AUTHOR_USER_ID']
# update contributor info
update_contributor_info(original=original_submission, base_dir=database_path)
# update the existing dictionary with new values from json_data
og_data.update(json_data)
if youtube_url:
og_data['youtube_theme_url'] = youtube_url
# clean old data
clean_old_data(data=og_data, item_type=item_type)
destination_filenames.append(os.path.join(database_path, f'{og_data["id"]}.json')) # set the item filename
if item_type == 'movie':
try:
if og_data["imdb_id"]:
# set the item filename
destination_filenames.append(os.path.join(imdb_path, f'{og_data["imdb_id"]}.json'))
except KeyError as e:
print(f'Error getting imdb_id: {e}')
for filename in destination_filenames:
destination_dir = os.path.dirname(filename)
os.makedirs(name=destination_dir, exist_ok=True) # create directory if it doesn't exist
with open(filename, "w") as dest_f:
json.dump(obj=og_data, indent=4, fp=dest_f, sort_keys=True)
return og_data
def clean_old_data(data: dict, item_type: str) -> None:
# remove old data
if item_type == 'game':
try:
del data['igdb_id']
except KeyError:
pass
def update_contributor_info(original: bool, base_dir: str) -> None:
contributor_file_path = os.path.join(os.path.dirname(base_dir), 'contributors.json')
with open(contributor_file_path, 'r') as contributor_f:
contributor_data = json.load(contributor_f)
try:
contributor_data[os.environ['ISSUE_AUTHOR_USER_ID']]
except KeyError:
contributor_data[os.environ['ISSUE_AUTHOR_USER_ID']] = dict(
items_added=1,
items_edited=0
)
else:
if original:
contributor_data[os.environ['ISSUE_AUTHOR_USER_ID']]['items_added'] += 1
else:
contributor_data[os.environ['ISSUE_AUTHOR_USER_ID']]['items_edited'] += 1
with open(contributor_file_path, 'w') as contributor_f:
json.dump(obj=contributor_data, indent=4, fp=contributor_f, sort_keys=True)
def process_issue_update() -> None:
# process submission file
submission = process_submission()
# check validity of provided YouTube url and update item dictionary
youtube_url = check_youtube(data=submission)
if args.game:
check_igdb(data=submission, youtube_url=youtube_url) # check validity of IGDB url and update item dictionary
elif args.movie:
check_themoviedb(data=submission, youtube_url=youtube_url)
else:
raise SystemExit('item_type not defined. Invalid label?')
def check_igdb(data: dict, youtube_url: str) -> None:
print('Checking igdb')
url = data['igdb_url'].strip()
print(f'igdb_url: {url}')
game_slug = re.search(r'https://www\.igdb.com/games/(.+)/*.*', url).group(1)
print(f'game_slug: {game_slug}')
process_item_id(item_type='game', game_slug=game_slug, youtube_url=youtube_url)
def check_themoviedb(data: dict, youtube_url: str) -> None:
print('Checking themoviedb')
url = data['themoviedb_url'].strip()
print(f'themoviedb_url: {url}')
themoviedb_id = re.search(r'https://www\.themoviedb.org/movie/(\d+)-*.*', url).group(1)
print(f'themoviedb_id: {themoviedb_id}')
process_item_id(item_type='movie', item_id=themoviedb_id, youtube_url=youtube_url)
def check_youtube(data: dict) -> str:
url = data['youtube_theme_url'].strip()
# url provided, now process it using youtube_dl
youtube_dl_params = dict(
outmpl='%(id)s.%(ext)s',
youtube_include_dash_manifest=False,
)
ydl = youtube_dl.YoutubeDL(params=youtube_dl_params)
with ydl:
try:
result = ydl.extract_info(
url=url,
download=False # We just want to extract the info
)
except youtube_dl.utils.DownloadError as e:
print(f'Error processing youtube url: {e}')
with open("comment.md", "w") as exceptions_f:
exceptions_f.write(f'# :bangbang: **Exception Occurred** :bangbang:\n\n```txt\n{e}\n```\n\n')
else:
if 'entries' in result:
# Can be a playlist or a list of videos
video_data = result['entries'][0]
else:
# Just a video
video_data = result
webpage_url = video_data['webpage_url']
return webpage_url
def process_submission() -> dict:
with open(file='submission.json') as file:
data = json.load(file)
return data
if __name__ == '__main__':
# setup arguments using argparse
parser = argparse.ArgumentParser(description="Add theme song to database.")
parser.add_argument('--daily_update', action='store_true', help='Run in daily update mode.')
parser.add_argument('--issue_update', action='store_true', help='Run in issue update mode.')
parser.add_argument('--game', action='store_true', help='Add Game theme song.')
parser.add_argument('--movie', action='store_true', help='Add Movie theme song.')
args = parser.parse_args()
if args.issue_update:
if not args.game and not args.movie:
raise Exception('"--game" or "--movie" arg must be passed.')
elif args.game and args.movie:
raise Exception('"--game" or "--movie" arg must be passed, not both.')
process_issue_update()
elif args.daily_update:
# migration tasks
# todo - this loop can be deleted
for db in databases:
# move contributors.json to ../_contributors.json
contributor_file = os.path.join(databases[db]['path'], 'contributors.json')
if os.path.isfile(contributor_file):
import shutil
shutil.move(src=contributor_file,
dst=os.path.join(os.path.dirname(databases[db]['path']), 'contributors.json'))
# remove all.json file
all_file = os.path.join(databases[db]['path'], 'all.json')
if os.path.isfile(all_file):
os.remove(all_file)
for db in databases:
all_db_items = os.listdir(path=databases[db]['path'])
for next_item_file in all_db_items:
next_item_id = next_item_file.rsplit('.', 1)[0]
queue.put((databases[db]['type'], next_item_id))
# finish queue before writing `all` files
queue.join()
for db in databases:
items_per_page = 10
all_items = sorted(databases[db]['all_items'], key=itemgetter('title'), reverse=False)
chunks = [all_items[x:x + items_per_page] for x in range(0, len(all_items), items_per_page)]
for chunk in chunks:
chunk_file = os.path.join(os.path.dirname(databases[db]['path']),
f'all_page_{chunks.index(chunk) + 1}.json')
with open(file=chunk_file, mode='w') as chunk_f:
json.dump(obj=chunk, fp=chunk_f)
pages = dict(pages=len(chunks))
pages_file = os.path.join(os.path.dirname(databases[db]['path']), 'pages.json')
with open(file=pages_file, mode='w') as pages_f:
json.dump(obj=pages, fp=pages_f)