-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdump.py
115 lines (92 loc) · 3.13 KB
/
dump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import asyncio
import json
import logging
import backoff
from redis import asyncio as aioredis
from placedump.common import (
ctx_aioredis,
get_async_gql_client,
get_token,
handle_backoff,
)
from placedump.constants import config_gql, socket_key, sub_gql
from placedump.tasks.parse import parse_message
log = logging.getLogger(__name__)
tasks = []
parsers = {}
async def get_meta() -> dict:
async with ctx_aioredis() as redis:
result = await redis.hgetall("place:meta")
return result or {}
async def push_to_key(redis: aioredis.Redis, key: str, payload: dict, canvas_id: int):
await redis.xadd(key, payload, maxlen=2000000)
message = payload["message"]
await redis.publish(key, message)
parse_message.delay(message, canvas_id)
async def main():
tasks.append(asyncio.create_task(graphql_parser("config")))
tasks.append(asyncio.create_task(parser_launcher()))
await asyncio.gather(*tasks)
async def parser_launcher():
while True:
meta = await get_meta()
highest_board = int(meta.get("index", "0"))
if len(parsers) < highest_board + 1:
log.info("meta canvas count update, new meta is")
log.info(meta)
for x in range(0, highest_board + 1):
if x not in parsers:
task = asyncio.create_task(graphql_parser(x))
parsers[x] = task
tasks.append(task)
await asyncio.sleep(1)
@backoff.on_exception(
backoff.fibo,
Exception,
max_time=30,
on_backoff=handle_backoff,
on_giveup=handle_backoff,
)
async def graphql_parser(canvas_id):
# pick the corrent gql schema and pick variables for canvas / config grabs.
if canvas_id == "config":
schema = config_gql
variables = {
"input": {
"channel": {
"category": "CONFIG",
"teamOwner": "GARLICBREAD",
}
}
}
else:
schema = sub_gql
variables = {
"input": {
"channel": {
"category": "CANVAS",
"teamOwner": "GARLICBREAD",
"tag": str(canvas_id),
}
}
}
# Using `async with` on the client will start a connection on the transport
# and provide a `session` variable to execute queries on this connection
log.info("socket connecting for canvas %s", canvas_id)
async with ctx_aioredis() as redis:
async with get_async_gql_client() as session:
log.info("socket connected for canvas %s", canvas_id)
async for result in session.subscribe(schema, variable_values=variables):
# append canvas id to messages
result["canvas_id"] = canvas_id
await push_to_key(
redis,
socket_key,
{
"message": json.dumps(result),
"type": "text",
},
canvas_id=canvas_id,
)
if __name__ == "__main__":
asyncio.run(main())