Skip to content

Commit

Permalink
resolve asyncio loop bug
Browse files Browse the repository at this point in the history
  • Loading branch information
wj-Mcat committed May 9, 2020
1 parent aca6b59 commit cc22602
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 57 deletions.
7 changes: 5 additions & 2 deletions examples/ding-dong-bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ async def do_some_thing():


# puppet_options = PuppetOptions(token='your-token-here')
hostie_puppet = HostiePuppet(PuppetOptions('your-token-here'), 'hostie-puppet')
bot = Wechaty(hostie_puppet).on('message', message)

bot: Wechaty = None

async def main():
"""doc"""
hostie_puppet = HostiePuppet(PuppetOptions('donut-test-user-6005'),
'hostie-puppet')
global bot
bot = Wechaty(hostie_puppet).on('message', message)
await bot.start()
await do_some_thing()

Expand Down
Empty file added examples/event_bot.py
Empty file.
6 changes: 3 additions & 3 deletions src/wechaty/user/contact.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async def ready(self, force_sync: bool = False):
:return:
"""
log.info('load contact %s', self.name)
if force_sync or self.is_ready():
if force_sync or not self.is_ready():
try:
payload = await self.puppet.contact_payload(
self.contact_id)
Expand Down Expand Up @@ -374,8 +374,8 @@ async def tags(self) -> List[Tag]:
"""
log.info('load contact tags for %s', self)
tag_ids = await self.puppet.tag_contact_list(self.contact_id)
tags = [self.wechaty.Tag.load(contact_id)
for contact_id in tag_ids]
tags = [self.wechaty.Tag.load(tag_id)
for tag_id in tag_ids]
return tags

async def sync(self):
Expand Down
8 changes: 5 additions & 3 deletions src/wechaty/wechaty.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,9 @@ async def start(self):
start wechaty bot
:return:
"""

await self.init_puppet()
await self.init_puppet_event_bridge(self.puppet)
await self.puppet.start()

# pylint: disable=R0912,R0915,R0914
async def init_puppet_event_bridge(self, puppet: Puppet):
Expand All @@ -312,11 +313,11 @@ def error_listener(payload: EventErrorPayload):

puppet.on('error', error_listener)

elif event_name == 'heartbeat':
elif event_name == 'heart-beat':
def heartbeat_listener(payload: EventHeartbeatPayload):
self.event_stream.emit('heartbeat', payload.data)

puppet.on('heartbeat', heartbeat_listener)
puppet.on('heart-beat', heartbeat_listener)

elif event_name == 'friendship':
async def friendship_listener(payload: EventFriendshipPayload):
Expand All @@ -330,6 +331,7 @@ async def friendship_listener(payload: EventFriendshipPayload):
elif event_name == 'login':
async def login_listener(payload: EventLoginPayload):
# TODO -> should to ContactSelf
log.info('login() <%s>', payload)
contact = self.Contact.load(payload.contact_id)
await contact.ready()
self.emit('login', Contact)
Expand Down
8 changes: 0 additions & 8 deletions src/wechaty_puppet/puppet.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,6 @@ async def contact_list(self) -> List[str]:
"""
raise NotImplementedError

async def get_contact_payload(self, contact_id: str) -> ContactPayload:
"""
get
:param contact_id:
:return:
"""
raise NotImplementedError

async def tag_contact_delete(self, tag_id: str) -> None:
"""
:return:
Expand Down
1 change: 1 addition & 0 deletions src/wechaty_puppet/schemas/contact.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class ContactPayload:
name: str
avatar: str

address: Optional[str] = None
alias: Optional[str] = None
city: Optional[str] = None
friend: Optional[bool] = None
Expand Down
10 changes: 10 additions & 0 deletions src/wechaty_puppet/schemas/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from dataclasses import dataclass
from typing import List, Optional

from wechaty_puppet import MessageType


class ScanStatus(Enum):
"""
Expand Down Expand Up @@ -61,6 +63,14 @@ class EventLogoutPayload(EventPayloadBase):
@dataclass
class EventMessagePayload(EventPayloadBase):
message_id: str
type: Optional[str] = None
from_id: Optional[str] = None
filename: Optional[str] = None
text: Optional[str] = None
timestamp: Optional[float] = None
room_id: Optional[str] = None
to_id: Optional[str] = None
mention_ids: Optional[List[str]] = None


@dataclass
Expand Down
5 changes: 5 additions & 0 deletions src/wechaty_puppet/schemas/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ class MessagePayloadTo:
room_id: Optional[str] = None


@dataclass
class EventMessagePayload:
message_id: str


@dataclass
class MessagePayload:
id: str
Expand Down
2 changes: 1 addition & 1 deletion src/wechaty_puppet/schemas/puppet.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class PuppetOptions:
"""
option to config puppet
"""
token: Optional[str] = None
end_point: Optional[str] = None
timeout: Optional[str] = None
token: Optional[str] = None


CHAT_EVENT_DICT = {
Expand Down
33 changes: 33 additions & 0 deletions src/wechaty_puppet_hostie/contact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from chatie_grpc.wechaty import ContactPayloadResponse
from chatie_grpc.wechaty import ContactGender as ChatieContactGender
from chatie_grpc.wechaty import ContactType as ChatieContactType

from wechaty_puppet import ContactPayload, ContactGender, ContactType
from wechaty_puppet_hostie.utils import get_common_attributes

CONTACT_GENDER_MAP = {
ChatieContactGender.CONTACT_GENDER_FEMALE.name: ContactGender.Female,
ChatieContactGender.CONTACT_GENDER_MALE.name: ContactGender.Male,
ChatieContactGender.CONTACT_GENDER_UNSPECIFIED.name: ContactGender.Unknown
}

CONTACT_TYPE_MAP = {
ChatieContactType.CONTACT_TYPE_PERSONAL.name: ContactType.Personal,
ChatieContactType.CONTACT_TYPE_OFFICIAL.name: ContactType.Official,
ChatieContactType.CONTACT_TYPE_UNSPECIFIED.name: ContactType.Unknown
}


def get_contact_payload_from_response(response: ContactPayloadResponse
) -> ContactPayload:
"""
:param response:
:return:
"""
payload_data = response.to_dict()
common_attributes = get_common_attributes(payload_data, ContactPayload)
common_attributes['type'] = CONTACT_TYPE_MAP[payload_data['type']]
common_attributes['gender'] = CONTACT_GENDER_MAP[payload_data['gender']]
return ContactPayload(**common_attributes)


43 changes: 43 additions & 0 deletions src/wechaty_puppet_hostie/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Dict

from chatie_grpc.wechaty import MessageType as ChatieMessageType, \
MessagePayloadResponse

from wechaty_puppet import EventMessagePayload, MessageType


# TODO -> this should be improved later
from wechaty_puppet_hostie.utils import get_common_attributes

MESSAGE_TYPE_MAP: Dict[ChatieMessageType, MessageType] = {
ChatieMessageType.MESSAGE_TYPE_MINI_PROGRAM.name: MessageType.MiniProgram,
ChatieMessageType.MESSAGE_TYPE_LOCATION.name: MessageType.Location,
ChatieMessageType.MESSAGE_TYPE_URL.name: MessageType.Url,
ChatieMessageType.MESSAGE_TYPE_CONTACT.name: MessageType.Contact,
ChatieMessageType.MESSAGE_TYPE_TEXT.name: MessageType.Text,
ChatieMessageType.MESSAGE_TYPE_AUDIO.name: MessageType.Audio
}


def get_message_payload_from_response(response: MessagePayloadResponse
) -> EventMessagePayload:
"""
:param response:
:return:
"""
payload_data = response
common_attributes = get_common_attributes(
payload_data, EventMessagePayload)

common_attributes['message_id'] = payload_data.get(
'id', None)
message_type = payload_data.get('type', None)
if message_type is None:
raise ValueError('message response data is invalid, '
'not contains type field <%s>',
payload_data)
common_attributes['type'] = MESSAGE_TYPE_MAP[message_type]
payload = EventMessagePayload(**common_attributes)
return payload


91 changes: 51 additions & 40 deletions src/wechaty_puppet_hostie/puppet.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""
from __future__ import annotations

import asyncio
import json
import logging
from typing import Optional, List, Dict, Tuple
Expand Down Expand Up @@ -59,6 +60,10 @@
from wechaty_puppet.schemas.puppet import PuppetOptions
# pylint: disable=R0904
from wechaty_puppet.schemas.url_link import UrlLinkPayload
from .contact import get_contact_payload_from_response
from .message import get_message_payload_from_response

from .utils import get_common_attributes

log = logging.getLogger('HostiePuppet')

Expand Down Expand Up @@ -144,20 +149,6 @@ async def contact_list(self) -> List[str]:
raise ValueError('response is invalid')
return response.ids

async def get_contact_payload(self, contact_id: str) -> ContactPayload:
"""
get contact payload
:param contact_id:
:return:
"""
response = await self.puppet_stub.contact_payload(id=contact_id)
if response is None:
# TODO -> need to refactor the raised error
raise ValueError('response is invalid')

contact_payload = ContactPayload(**response.to_dict())
return contact_payload

async def tag_contact_delete(self, tag_id: str) -> None:
"""
delete some tag
Expand Down Expand Up @@ -394,7 +385,7 @@ async def contact_payload(self, contact_id: str) -> ContactPayload:
:return:
"""
response = await self.puppet_stub.contact_payload(id=contact_id)
payload = ContactPayload(**response.to_dict())
payload = get_contact_payload_from_response(response)
return payload

async def contact_avatar(self, contact_id: str,
Expand Down Expand Up @@ -649,7 +640,26 @@ async def start(self) -> None:
start puppet_stub
:return:
"""
await self.puppet_stub.stop()
# await self.puppet_stub.stop()
# loop = asyncio.get_event_loop()
# loop.run_forever()
#
# loop.run_until_complete(self.puppet_stub.start(), self._listen_for_event())

# await asyncio.gather(
# self.puppet_stub.start(),
# self._listen_for_event()
# )
# loop = asyncio.get_running_loop()
#
# asyncio.run_coroutine_threadsafe(
# self.puppet_stub.start(),
# loop
# )
try:
await self.puppet_stub.stop()
except Exception as exception:
pass
await self.puppet_stub.start()
await self._listen_for_event()
return None
Expand All @@ -658,6 +668,7 @@ async def stop(self):
"""
stop the grpc channel connection
"""

await self.puppet_stub.stop()
await self.channel.close()

Expand All @@ -669,71 +680,71 @@ async def _listen_for_event(self):
# listen event from grpclib
async for response in self.puppet_stub.event():
if response is not None:
payload_data = json.loads(response.payload)
if response.type == EventType.EVENT_TYPE_SCAN:
payload_data: dict = json.loads(response.payload)
if response.type == EventType.EVENT_TYPE_SCAN.value:
# create qr_code
payload = EventScanPayload(**payload_data)
self._event_stream.emit('scan', payload)

elif response.type == EventType.EVENT_TYPE_DONG:
elif response.type == EventType.EVENT_TYPE_DONG.value:
payload = EventDongPayload(**payload_data)
self._event_stream.emit('dong', payload)

elif response.type == EventType.EVENT_TYPE_MESSAGE:
payload = EventMessagePayload(**payload_data)
elif response.type == EventType.EVENT_TYPE_MESSAGE.value:
payload = get_message_payload_from_response(response)
self._event_stream.emit('message', payload)

elif response.type == EventType.EVENT_TYPE_HEARTBEAT:
elif response.type == EventType.EVENT_TYPE_HEARTBEAT.value:
payload = EventHeartbeatPayload(**payload_data)
self._event_stream.emit('heartbeat', payload)

elif response.type == EventType.EVENT_TYPE_DONG:
payload = EventDongPayload(**payload_data)
self._event_stream.emit('dong', payload)

elif response.type == EventType.EVENT_TYPE_ERROR:
elif response.type == EventType.EVENT_TYPE_ERROR.value:
payload = EventErrorPayload(**payload_data)
self._event_stream.emit('error', payload)

elif response.type == EventType.EVENT_TYPE_FRIENDSHIP:
elif response.type == EventType.EVENT_TYPE_FRIENDSHIP.value:
payload = EventFriendshipPayload(**payload_data)
self._event_stream.emit('friendship', payload)

elif response.type == EventType.EVENT_TYPE_ROOM_JOIN:
elif response.type == EventType.EVENT_TYPE_ROOM_JOIN.value:
payload = EventRoomJoinPayload(**payload_data)
self._event_stream.emit('room-join', payload)

elif response.type == EventType.EVENT_TYPE_ROOM_INVITE:
elif response.type == EventType.EVENT_TYPE_ROOM_INVITE.value:
payload = EventRoomInvitePayload(**payload_data)
self._event_stream.emit('room-invite', payload)

elif response.type == EventType.EVENT_TYPE_ROOM_LEAVE:
elif response.type == EventType.EVENT_TYPE_ROOM_LEAVE.value:
payload = EventRoomLeavePayload(**payload_data)
self._event_stream.emit('room-leave', payload)

elif response.type == EventType.EVENT_TYPE_ROOM_TOPIC:
elif response.type == EventType.EVENT_TYPE_ROOM_TOPIC.value:
payload = EventRoomTopicPayload(**payload_data)
self._event_stream.emit('room-topic', payload)

elif response.type == EventType.EVENT_TYPE_ROOM_TOPIC:
elif response.type == EventType.EVENT_TYPE_ROOM_TOPIC.value:
payload = EventRoomTopicPayload(**payload_data)
self._event_stream.emit('room-topic', payload)

elif response.type == EventType.EVENT_TYPE_READY:
elif response.type == EventType.EVENT_TYPE_READY.value:
payload = EventReadyPayload(**payload_data)
self._event_stream.emit('ready', payload)

elif response.type == EventType.EVENT_TYPE_RESET:
elif response.type == EventType.EVENT_TYPE_RESET.value:
payload = EventResetPayload(**payload_data)
self._event_stream.emit('reset', payload)

elif response.type == EventType.EVENT_TYPE_LOGIN:
payload = EventLoginPayload(**payload_data)
elif response.type == EventType.EVENT_TYPE_LOGIN.value:
payload = EventLoginPayload(
contact_id=payload_data.get('contactId', None)
)
self._event_stream.emit('login', payload)

elif response.type == EventType.EVENT_TYPE_LOGOUT:
payload = EventLogoutPayload(**payload_data)
elif response.type == EventType.EVENT_TYPE_LOGOUT.value:
payload = EventLogoutPayload(
contact_id=payload_data["contactId"]
)
self._event_stream.emit('logout', payload)

elif response.type == EventType.EVENT_TYPE_UNSPECIFIED:
elif response.type == EventType.EVENT_TYPE_UNSPECIFIED.value:
pass
Loading

0 comments on commit cc22602

Please sign in to comment.