-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPubSubClient.py
41 lines (36 loc) · 1.42 KB
/
PubSubClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
import json
import websockets
import queue
class PubSubClient:
def __init__(self, accessToken, topics):
self._url = 'wss://pubsub-edge.twitch.tv'
self._topics = topics
self._accessToken = accessToken
self._messages = queue.Queue()
async def connect(self):
msg='{"type":"LISTEN", "nonce":"1234554321", "data": {"topics": ["channel-points-channel-v1.56618017"], "auth_token": "' + self._accessToken + '"}}'
self._connection = await websockets.client.connect(self._url)
await self._connection.send(msg)
print("Connection status:" + await self._connection.recv()) # TODO: Replace with connection success/fail handling
async def receive(self):
while True:
print("receive()")
msg = await self._connection.recv()
self._messages.put(msg)
async def heartbeat(self):
while True:
msg='{"type":"PING"}'
print("heartbeat()")
if self._connection.open:
print("heartbeat(): Connection opened")
await self._connection.send(msg)
await asyncio.sleep(60)
async def getNextMessage(self):
while True:
try:
print("getNextMessage()")
return self._messages.get_nowait()
except queue.Empty:
print("getNextMessage(): sleep")
await asyncio.sleep(1)