diff --git a/examples/audio-only_client.py b/examples/audio-only_client.py index 82397a0..6ce626e 100644 --- a/examples/audio-only_client.py +++ b/examples/audio-only_client.py @@ -23,11 +23,15 @@ # Works on MacOS. Does NOT work on RPi 3B+ (I cannot figure out why. Help will # be much appreciated) +import typing + +import pyaudio import pymumble.pymumble_py3 as pymumble_py3 from pymumble.pymumble_py3.callbacks import PYMUMBLE_CLBK_SOUNDRECEIVED as PCS -import subprocess as sp -from time import sleep -import pyaudio + +if typing.TYPE_CHECKING: + from pymumble.pymumble_py3.users import User + from pymumble.pymumble_py3.soundqueue import SoundChunk # Connection details for mumble server. Harded code for now, will have to be # command line arguments eventually @@ -44,16 +48,17 @@ RATE = 48000 # pymumble soundchunk.pcm is 48000Hz p = pyaudio.PyAudio() -stream = p.open(format=FORMAT, - channels=CHANNELS, - rate=RATE, - input=True, # enable both talk - output=True, # and listen - frames_per_buffer=CHUNK) - +stream = p.open( + format=FORMAT, + channels=CHANNELS, + rate=RATE, + input=True, # enable both talk + output=True, # and listen + frames_per_buffer=CHUNK, +) # mumble client set up -def sound_received_handler(user, soundchunk): +def sound_received_handler(user: "User", soundchunk: "SoundChunk") -> None: """ play sound received from mumble server upon its arrival """ stream.write(soundchunk.pcm) diff --git a/examples/echobot.py b/examples/echobot.py index c4cc74f..43f7e95 100755 --- a/examples/echobot.py +++ b/examples/echobot.py @@ -2,23 +2,29 @@ # This bot sends any sound it receives back to where it has come from. # WARNING! Don't put two bots in the same place! -import pymumble_py3 import time -from pymumble_py3.callbacks import PYMUMBLE_CLBK_SOUNDRECEIVED as PCS +import typing + +import pymumble_py3 +from pymumble_py3.constants import PYMUMBLE_CLBK_SOUNDRECEIVED as PCS + +if typing.TYPE_CHECKING: + from pymumble.pymumble_py3.users import User + from pymumble.pymumble_py3.soundqueue import SoundChunk pwd = "" # password server = "localhost" nick = "Bob" -def sound_received_handler(user, soundchunk): +def sound_received_handler(user: "User", soundchunk: "SoundChunk") -> None: # sending the received sound back to server mumble.sound_output.add_sound(soundchunk.pcm) mumble = pymumble_py3.Mumble(server, nick, password=pwd) mumble.callbacks.set_callback(PCS, sound_received_handler) -mumble.set_receive_sound(1) # we want to receive sound +mumble.set_receive_sound(True) # we want to receive sound mumble.start() while 1: diff --git a/examples/talking_bot.py b/examples/talking_bot.py index 982abfa..65f44df 100755 --- a/examples/talking_bot.py +++ b/examples/talking_bot.py @@ -2,10 +2,12 @@ # This bot reads standard input and converts them to speech via espeak and # sends them to server(of course after converting the wave format to s32le) # A blank line to exit. -import pymumble_py3 import subprocess as sp + +import pymumble_py3 + try: - import readline # optional + import readline # optional except ImportError: pass @@ -17,13 +19,14 @@ mumble.start() s = " " while s: - s = input(") ") + s = input(") ") # converting text to speech - command = ["espeak","--stdout", s] + command = ["espeak", "--stdout", s] wave_file = sp.Popen(command, stdout=sp.PIPE).stdout # converting the wave speech to pcm - command = ["ffmpeg", "-i", "-", "-ac", "1", "-f", "s32le","-"] - sound = sp.Popen(command, stdout=sp.PIPE, stderr=sp.DEVNULL, - stdin=wave_file).stdout.read() + command = ["ffmpeg", "-i", "-", "-ac", "1", "-f", "s32le", "-"] + sound = sp.Popen( # type: ignore + command, stdout=sp.PIPE, stderr=sp.DEVNULL, stdin=wave_file + ).stdout.read() # sending speech to server mumble.sound_output.add_sound(sound) diff --git a/pymumble_py3/__init__.py b/pymumble_py3/__init__.py index a67cdea..e49506c 100644 --- a/pymumble_py3/__init__.py +++ b/pymumble_py3/__init__.py @@ -1,3 +1,5 @@ # -*- coding: utf-8 -*- from .mumble import Mumble + +__all__ = ["Mumble"] diff --git a/pymumble_py3/blobs.py b/pymumble_py3/blobs.py index f32db7f..54821e7 100644 --- a/pymumble_py3/blobs.py +++ b/pymumble_py3/blobs.py @@ -1,42 +1,47 @@ # -*- coding: utf-8 -*- import struct +import typing from .constants import * from .mumble_pb2 import RequestBlob +if typing.TYPE_CHECKING: + from .mumble import Mumble -class Blobs(dict): + +class Blobs(typing.Dict[bytes, typing.Any]): """ Manage the Blob library """ - def __init__(self, mumble_object): + + def __init__(self, mumble_object: "Mumble"): self.mumble_object = mumble_object - - def get_user_comment(self, hash): + + def get_user_comment(self, hash: bytes) -> None: """Request the comment of a user""" if hash in self: return request = RequestBlob() request.session_comment.extend(struct.unpack("!5I", hash)) - + self.mumble_object.send_message(PYMUMBLE_MSG_TYPES_REQUESTBLOB, request) - - def get_user_texture(self, hash): + + def get_user_texture(self, hash: bytes) -> None: """Request the image of a user""" if hash in self: return request = RequestBlob() request.session_texture.extend(struct.unpack("!5I", hash)) - + self.mumble_object.send_message(PYMUMBLE_MSG_TYPES_REQUESTBLOB, request) - - def get_channel_description(self, hash): + + def get_channel_description(self, hash: bytes) -> None: """Request the description/comment of a channel""" if hash in self: return request = RequestBlob() request.channel_description.extend(struct.unpack("!5I", hash)) - + self.mumble_object.send_message(PYMUMBLE_MSG_TYPES_REQUESTBLOB, request) diff --git a/pymumble_py3/callbacks.py b/pymumble_py3/callbacks.py index 8ac059c..16d0f9e 100644 --- a/pymumble_py3/callbacks.py +++ b/pymumble_py3/callbacks.py @@ -1,11 +1,16 @@ # -*- coding: utf-8 -*- -from .errors import UnknownCallbackError -from .constants import * import threading +import typing + +from .constants import * +from .errors import UnknownCallbackError + +_Callback = typing.Callable[..., typing.Any] +_Callbacks = typing.List[_Callback] -class CallBacks(dict): +class CallBacks(typing.Dict[str, typing.Optional[_Callbacks]]): """ Define the callbacks that can be registered by the application. Multiple functions can be assigned to a callback using "add_callback" @@ -13,79 +18,85 @@ class CallBacks(dict): The call is done from within the pymumble loop thread, it's important to keep processing short to avoid delays on audio transmission """ - def __init__(self): - self.update({ - PYMUMBLE_CLBK_CONNECTED: None, # Connection succeeded - PYMUMBLE_CLBK_CHANNELCREATED: None, # send the created channel object as parameter - PYMUMBLE_CLBK_CHANNELUPDATED: None, # send the updated channel object and a dict with all the modified fields as parameter - PYMUMBLE_CLBK_CHANNELREMOVED: None, # send the removed channel object as parameter - PYMUMBLE_CLBK_USERCREATED: None, # send the added user object as parameter - PYMUMBLE_CLBK_USERUPDATED: None, # send the updated user object and a dict with all the modified fields as parameter - PYMUMBLE_CLBK_USERREMOVED: None, # send the removed user object and the mumble message as parameter - PYMUMBLE_CLBK_SOUNDRECEIVED: None, # send the user object that received the sound and the SoundChunk object itself - PYMUMBLE_CLBK_TEXTMESSAGERECEIVED: None, # Send the received message - PYMUMBLE_CLBK_CONTEXTACTIONRECEIVED: None, # Send the contextaction message - }) - - def set_callback(self, callback, dest): + + def __init__(self) -> None: + self.update( + { + PYMUMBLE_CLBK_CONNECTED: None, # Connection succeeded + PYMUMBLE_CLBK_CHANNELCREATED: None, # send the created channel object as parameter + PYMUMBLE_CLBK_CHANNELUPDATED: None, # send the updated channel object and a dict with all the modified fields as parameter + PYMUMBLE_CLBK_CHANNELREMOVED: None, # send the removed channel object as parameter + PYMUMBLE_CLBK_USERCREATED: None, # send the added user object as parameter + PYMUMBLE_CLBK_USERUPDATED: None, # send the updated user object and a dict with all the modified fields as parameter + PYMUMBLE_CLBK_USERREMOVED: None, # send the removed user object and the mumble message as parameter + PYMUMBLE_CLBK_SOUNDRECEIVED: None, # send the user object that received the sound and the SoundChunk object itself + PYMUMBLE_CLBK_TEXTMESSAGERECEIVED: None, # Send the received message + PYMUMBLE_CLBK_CONTEXTACTIONRECEIVED: None, # Send the contextaction message + } + ) + + def set_callback(self, callback: str, dest: _Callback) -> None: """Define the function to call for a specific callback. Suppress any existing callback function""" if callback not in self: - raise UnknownCallbackError("Callback \"%s\" does not exists." % callback) - + raise UnknownCallbackError('Callback "%s" does not exists.' % callback) + self[callback] = [dest] - - def add_callback(self, callback, dest): + + def add_callback(self, callback: str, dest: _Callback) -> None: """Add the function to call for a specific callback.""" if callback not in self: - raise UnknownCallbackError("Callback \"%s\" does not exists." % callback) - + raise UnknownCallbackError('Callback "%s" does not exists.' % callback) + if self[callback] is None: self[callback] = list() - self[callback].append(dest) - - def get_callback(self, callback): + typing.cast(_Callbacks, self[callback]).append(dest) + + def get_callback(self, callback: str) -> typing.Optional[_Callbacks]: """Get the functions assigned to a callback as a list. Return None if no callback defined""" if callback not in self: - raise UnknownCallbackError("Callback \"%s\" does not exists." % callback) - + raise UnknownCallbackError('Callback "%s" does not exists.' % callback) + return self[callback] - - def remove_callback(self, callback, dest): + + def remove_callback(self, callback: str, dest: _Callback) -> None: """Remove a specific function from a specific callback. Function object must be the one added before.""" if callback not in self: - raise UnknownCallbackError("Callback \"%s\" does not exists." % callback) - - if self[callback] is None or dest not in self[callback]: - raise UnknownCallbackError("Function not registered for callback \"%s\"." % callback) - - self[callback].remove(dest) - if len(self[callback]) == 0: - self[callback] = None - - def reset_callback(self, callback): + raise UnknownCallbackError('Callback "%s" does not exists.' % callback) + + callbacks = self[callback] + if callbacks is None or dest not in callbacks: + raise UnknownCallbackError( + 'Function not registered for callback "%s".' % callback + ) + + callbacks.remove(dest) + if len(callbacks) == 0: + callbacks = None + + def reset_callback(self, callback: str) -> None: """remove functions for a defined callback""" if callback not in self: - raise UnknownCallbackError("Callback \"%s\" does not exists." % callback) - + raise UnknownCallbackError('Callback "%s" does not exists.' % callback) + self[callback] = None - - def call_callback(self, callback, *pos_parameters): + + def call_callback(self, callback: str, *pos_parameters: typing.Any) -> None: """Call all the registered function for a specific callback.""" if callback not in self: - raise UnknownCallbackError("Callback \"%s\" does not exists." % callback) - + raise UnknownCallbackError('Callback "%s" does not exists.' % callback) + if self[callback]: - for func in self[callback]: + for func in typing.cast(_Callbacks, self[callback]): if callback is PYMUMBLE_CLBK_TEXTMESSAGERECEIVED: thr = threading.Thread(target=func, args=pos_parameters) thr.start() else: func(*pos_parameters) - - def __call__(self, callback, *pos_parameters): + + def __call__(self, callback: str, *pos_parameters: typing.Any) -> None: """shortcut to be able to call the dict element as a function""" self.call_callback(callback, *pos_parameters) - - def get_callbacks_list(self): + + def get_callbacks_list(self) -> typing.List[str]: """Get a list of all callbacks""" return list(self.keys()) diff --git a/pymumble_py3/channels.py b/pymumble_py3/channels.py index 27c32fd..a557c9d 100644 --- a/pymumble_py3/channels.py +++ b/pymumble_py3/channels.py @@ -1,22 +1,31 @@ # -*- coding: utf-8 -*- -from .constants import * +import typing from threading import Lock -from .errors import UnknownChannelError, TextTooLongError, ImageTooBigError + from . import messages +from .callbacks import CallBacks +from .constants import * +from .errors import ImageTooBigError, TextTooLongError, UnknownChannelError +from .users import User + +if typing.TYPE_CHECKING: + from .mumble import Mumble + +ProtoMessage = typing.Any -class Channels(dict): +class Channels(typing.Dict[int, "Channel"]): """ Object that Stores all channels and their properties. """ - def __init__(self, mumble_object, callbacks): + def __init__(self, mumble_object: "Mumble", callbacks: CallBacks): self.mumble_object = mumble_object self.callbacks = callbacks self.lock = Lock() - def update(self, message): + def update(self, message: ProtoMessage) -> None: # type: ignore """Update the channel information based on an incoming message""" self.lock.acquire() @@ -25,11 +34,13 @@ def update(self, message): self.callbacks(PYMUMBLE_CLBK_CHANNELCREATED, self[message.channel_id]) else: # update the channel actions = self[message.channel_id].update(message) - self.callbacks(PYMUMBLE_CLBK_CHANNELUPDATED, self[message.channel_id], actions) + self.callbacks( + PYMUMBLE_CLBK_CHANNELUPDATED, self[message.channel_id], actions + ) self.lock.release() - def remove(self, id): + def remove(self, id: int) -> None: """Delete a channel when server signal the channel is removed""" self.lock.acquire() @@ -40,16 +51,16 @@ def remove(self, id): self.lock.release() - def find_by_tree(self, tree): + def find_by_tree(self, tree: typing.Iterable[str]) -> "Channel": """Find a channel by its full path (a list with an element for each leaf)""" - if not getattr(tree, '__iter__', False): + if not getattr(tree, "__iter__", False): tree = tree # function use argument as a list current = self[0] for name in tree: # going up the tree found = False - for subchannel in self.get_childs(current).values(): + for subchannel in self.get_childs(current): if subchannel["name"] == name: current = subchannel found = True @@ -61,28 +72,30 @@ def find_by_tree(self, tree): return current - def get_childs(self, channel): + def get_childs(self, channel: "Channel") -> typing.List["Channel"]: """Get the child channels of a channel in a list""" childs = list() for item in self.values(): - if item.get('parent') and item["parent"] == channel["channel_id"]: + if item.get("parent") and item["parent"] == channel["channel_id"]: childs.append(item) return childs - def get_descendants(self, channel): + def get_descendants( + self, channel: "Channel" + ) -> typing.List[typing.List["Channel"]]: """Get all the descendant of a channel, in nested lists""" descendants = list() - for subchannel in channel.get_childs(): - descendants.append(subchannel.get_childs()) + for subchannel in self.get_childs(channel): + descendants.append(self.get_childs(subchannel)) return descendants - def get_tree(self, channel): + def get_tree(self, channel: "Channel") -> typing.List["Channel"]: """Get the whole list of channels, in a multidimensional list""" - tree = list() + tree: typing.List[Channel] = list() current = channel @@ -94,7 +107,7 @@ def get_tree(self, channel): return tree - def find_by_name(self, name): + def find_by_name(self, name: str) -> "Channel": """Find a channel by name. Stop on the first that match""" if name == "": return self[0] @@ -106,33 +119,33 @@ def find_by_name(self, name): err = "Channel %s does not exists" % name raise UnknownChannelError(err) - def new_channel(self, parent, name, temporary=True): + def new_channel(self, parent: int, name: str, temporary: bool = True) -> None: cmd = messages.CreateChannel(parent, name, temporary) self.mumble_object.execute_command(cmd) - def remove_channel(self, channel_id): + def remove_channel(self, channel_id: int) -> None: cmd = messages.RemoveChannel(channel_id) self.mumble_object.execute_command(cmd) -class Channel(dict): +class Channel(typing.Dict[str, typing.Any]): """ Stores information about one specific channel """ - def __init__(self, mumble_object, message): + def __init__(self, mumble_object: "Mumble", message: ProtoMessage) -> None: self.mumble_object = mumble_object self["channel_id"] = message.channel_id self.update(message) - def get_users(self): + def get_users(self) -> typing.List[User]: users = [] for user in list(self.mumble_object.users.values()): if user["channel_id"] == self["channel_id"]: users.append(user) return users - def update(self, message): + def update(self, message: ProtoMessage) -> typing.Dict[str, typing.Any]: # type: ignore """Update a channel based on an incoming message""" actions = dict() @@ -142,18 +155,24 @@ def update(self, message): actions.update(self.update_field(field.name, value)) if message.HasField("description_hash"): - actions.update(self.update_field("description_hash", message.description_hash)) + actions.update( + self.update_field("description_hash", message.description_hash) + ) if message.HasField("description"): self.mumble_object.blobs[message.description_hash] = message.description else: - self.mumble_object.blobs.get_channel_description(message.description_hash) + self.mumble_object.blobs.get_channel_description( + message.description_hash + ) return actions # return a dict with updates performed, useful for the callback functions - def get_id(self): - return self["channel_id"] + def get_id(self) -> int: + return typing.cast(int, self["channel_id"]) - def update_field(self, name, field): + def update_field( + self, name: str, field: typing.Any + ) -> typing.Dict[str, typing.Any]: """Update one value""" actions = dict() if name not in self or self[name] != field: @@ -162,25 +181,26 @@ def update_field(self, name, field): return actions # return a dict with updates performed, useful for the callback functions - def get_property(self, property): + def get_property(self, property: str) -> typing.Optional[typing.Any]: if property in self: return self[property] else: return None - def move_in(self, session=None): + def move_in(self, session: typing.Optional[int] = None) -> None: """Ask to move a session in a specific channel. By default move pymumble own session""" if session is None: session = self.mumble_object.users.myself_session + assert session is not None cmd = messages.MoveCmd(session, self["channel_id"]) self.mumble_object.execute_command(cmd) - def remove(self): + def remove(self) -> None: cmd = messages.RemoveChannel(self["channel_id"]) self.mumble_object.execute_command(cmd) - def send_text_message(self, message): + def send_text_message(self, message: str) -> None: """Send a text message to the channel.""" # TODO: This check should be done inside execute_command() @@ -194,6 +214,7 @@ def send_text_message(self, message): raise TextTooLongError(self.mumble_object.get_max_message_length()) session = self.mumble_object.users.myself_session + assert session is not None cmd = messages.TextMessage(session, self["channel_id"], message) self.mumble_object.execute_command(cmd) diff --git a/pymumble_py3/commands.py b/pymumble_py3/commands.py index c724fbb..bb8da66 100644 --- a/pymumble_py3/commands.py +++ b/pymumble_py3/commands.py @@ -1,6 +1,9 @@ # -*- coding: utf-8 -*- -from threading import Lock +import typing from collections import deque +from threading import Lock + +from .messages import Cmd class Commands: @@ -9,33 +12,34 @@ class Commands: from whatever tread. Each command has it's own lock semaphore to signal is received an answer """ - def __init__(self): + + def __init__(self) -> None: self.id = 0 - - self.queue = deque() - + + self.queue: typing.Deque[Cmd] = deque() + self.lock = Lock() - - def new_cmd(self, cmd): + + def new_cmd(self, cmd: Cmd) -> Lock: """Add a command to the queue""" self.lock.acquire() self.id += 1 - cmd.msg_id = self.id + cmd.cmd_id = self.id self.queue.appendleft(cmd) cmd.lock.acquire() self.lock.release() return cmd.lock - - def is_cmd(self): + + def is_cmd(self) -> bool: """Check if there is a command waiting in the queue""" if len(self.queue) > 0: return True else: return False - - def pop_cmd(self): + + def pop_cmd(self) -> typing.Optional[Cmd]: """Return the next command and remove it from the queue""" self.lock.acquire() @@ -46,9 +50,7 @@ def pop_cmd(self): else: self.lock.release() return None - - def answer(self, cmd): + + def answer(self, cmd: Cmd) -> None: """Unlock the command to signal it's completion""" cmd.lock.release() - - diff --git a/pymumble_py3/constants.py b/pymumble_py3/constants.py index 604a267..fc2f58b 100644 --- a/pymumble_py3/constants.py +++ b/pymumble_py3/constants.py @@ -9,10 +9,10 @@ # Tunable parameters # ============================================================================ PYMUMBLE_CONNECTION_RETRY_INTERVAL = 10 # in sec -PYMUMBLE_AUDIO_PER_PACKET = float(20)/1000 # size of one audio packet in sec +PYMUMBLE_AUDIO_PER_PACKET = float(20) / 1000 # size of one audio packet in sec PYMUMBLE_BANDWIDTH = 50 * 1000 # total outgoing bitrate in bit/seconds PYMUMBLE_LOOP_RATE = 0.01 # pause done between two iteration of the main loop of the mumble thread, in sec - # should be small enough to manage the audio output, so smaller than PYMUMBLE_AUDIO_PER_PACKET +# should be small enough to manage the audio output, so smaller than PYMUMBLE_AUDIO_PER_PACKET # ============================================================================ # Constants @@ -20,22 +20,29 @@ PYMUMBLE_PROTOCOL_VERSION = (1, 2, 4) PYMUMBLE_VERSION_STRING = "PyMumble %s" % PYMUMBLE_VERSION PYMUMBLE_OS_STRING = "PyMumble %s" % PYMUMBLE_VERSION -PYMUMBLE_OS_VERSION_STRING = "Python %s - %s %s" % (sys.version, platform.system(), platform.release()) +PYMUMBLE_OS_VERSION_STRING = "Python %s - %s %s" % ( + sys.version, + platform.system(), + platform.release(), +) PYMUMBLE_PING_DELAY = 10 # interval between 2 pings in sec PYMUMBLE_SAMPLERATE = 48000 # in hz +PYMUMBLE_CHANNELS = 1 # channel count -PYMUMBLE_SEQUENCE_DURATION = float(10)/1000 # in sec +PYMUMBLE_SEQUENCE_DURATION = float(10) / 1000 # in sec PYMUMBLE_SEQUENCE_RESET_INTERVAL = 5 # in sec -PYMUMBLE_READ_BUFFER_SIZE = 4096 # how much bytes to read at a time from the control socket, in bytes +PYMUMBLE_READ_BUFFER_SIZE = ( + 4096 # how much bytes to read at a time from the control socket, in bytes +) # client connection state PYMUMBLE_CONN_STATE_NOT_CONNECTED = 0 PYMUMBLE_CONN_STATE_AUTHENTICATING = 1 PYMUMBLE_CONN_STATE_CONNECTED = 2 PYMUMBLE_CONN_STATE_FAILED = 3 - + # Mumble control messages types PYMUMBLE_MSG_TYPES_VERSION = 0 PYMUMBLE_MSG_TYPES_UDPTUNNEL = 1 @@ -65,7 +72,7 @@ # callbacks names PYMUMBLE_CLBK_CONNECTED = "connected" -PYMUMBLE_CLBK_CHANNELCREATED = "channel_created" +PYMUMBLE_CLBK_CHANNELCREATED = "channel_created" PYMUMBLE_CLBK_CHANNELUPDATED = "channel_updated" PYMUMBLE_CLBK_CHANNELREMOVED = "channel_remove" PYMUMBLE_CLBK_USERCREATED = "user_created" diff --git a/pymumble_py3/errors.py b/pymumble_py3/errors.py index 1c1d2bf..0c8933f 100644 --- a/pymumble_py3/errors.py +++ b/pymumble_py3/errors.py @@ -4,88 +4,88 @@ class CodecNotSupportedError(Exception): """Thrown when receiving an audio packet from an unsupported codec""" - def __init__(self, value): + def __init__(self, value: str): self.value = value - def __str__(self): + def __str__(self) -> str: return repr(self.value) class ConnectionRejectedError(Exception): """Thrown when server reject the connection""" - def __init__(self, value): + def __init__(self, value: str): self.value = value - def __str__(self): + def __str__(self) -> str: return repr(self.value) class InvalidFormatError(Exception): """Thrown when receiving a packet not understood""" - def __init__(self, value): + def __init__(self, value: str): self.value = value - def __str__(self): + def __str__(self) -> str: return repr(self.value) class UnknownCallbackError(Exception): """Thrown when asked for an unknown callback""" - def __init__(self, value): + def __init__(self, value: str): self.value = value - def __str__(self): + def __str__(self) -> str: return repr(self.value) class UnknownChannelError(Exception): """Thrown when using an unknown channel""" - def __init__(self, value): + def __init__(self, value: str): self.value = value - def __str__(self): + def __str__(self) -> str: return repr(self.value) class InvalidSoundDataError(Exception): """Thrown when trying to send an invalid audio pcm data""" - def __init__(self, value): + def __init__(self, value: str): self.value = value - def __str__(self): + def __str__(self) -> str: return repr(self.value) class InvalidVarInt(Exception): """Thrown when trying to decode an invalid varint""" - def __init__(self, value): + def __init__(self, value: str): self.value = value - def __str__(self): + def __str__(self) -> str: return repr(self.value) class TextTooLongError(Exception): """Thrown when trying to send a message which is longer than allowed""" - def __init__(self, value): + def __init__(self, value: int): self.value = value - def __str__(self): - return 'Maximum Text allowed length: {}'.format(self.value) + def __str__(self) -> str: + return "Maximum Text allowed length: {}".format(self.value) class ImageTooBigError(Exception): """Thrown when trying to send a message or image which is longer than allowed""" - def __init__(self, value): + def __init__(self, value: int): self.value = value - def __str__(self): - return 'Maximum Text/Image allowed length: {}'.format(self.value) + def __str__(self) -> str: + return "Maximum Text/Image allowed length: {}".format(self.value) diff --git a/pymumble_py3/messages.py b/pymumble_py3/messages.py index 83a0ca1..08ae54e 100644 --- a/pymumble_py3/messages.py +++ b/pymumble_py3/messages.py @@ -1,7 +1,9 @@ # -*- coding: utf-8 -*- -from .constants import * +import typing from threading import Lock +from .constants import * + class Cmd: """ @@ -9,55 +11,53 @@ class Cmd: usually to forward to the murmur server """ - def __init__(self): - self.cmd_id = None + def __init__(self) -> None: + self.cmd_id: typing.Optional[int] = None self.lock = Lock() - self.cmd = None - self.parameters = None - self.response = None + self.cmd: typing.Union[str, int, None] = None + self.parameters: typing.Optional[typing.Mapping[str, typing.Any]] = None + self.response: typing.Optional[bool] = None class MoveCmd(Cmd): """Command to move a user from channel""" - def __init__(self, session, channel_id): + def __init__(self, session: int, channel_id: int) -> None: Cmd.__init__(self) self.cmd = PYMUMBLE_CMD_MOVE - self.parameters = {"session": session, - "channel_id": channel_id} + self.parameters = {"session": session, "channel_id": channel_id} class TextMessage(Cmd): """Command to send a text message""" - def __init__(self, session, channel_id, message): + def __init__(self, session: int, channel_id: int, message: str) -> None: Cmd.__init__(self) self.cmd = PYMUMBLE_CMD_TEXTMESSAGE - self.parameters = {"session": session, - "channel_id": channel_id, - "message": message} + self.parameters = { + "session": session, + "channel_id": channel_id, + "message": message, + } class TextPrivateMessage(Cmd): """Command to send a private text message""" - def __init__(self, session, message): + def __init__(self, session: int, message: str) -> None: Cmd.__init__(self) self.cmd = PYMUMBLE_CMD_TEXTPRIVATEMESSAGE - self.parameters = {"session": session, - "message": message} + self.parameters = {"session": session, "message": message} class ModUserState(Cmd): """Command to change a user state""" - def __init__(self, session, params): - Cmd.__init__(self) - + def __init__(self, session: int, params: typing.Mapping[str, typing.Any]) -> None: self.cmd = PYMUMBLE_CMD_MODUSERSTATE self.parameters = params @@ -65,18 +65,17 @@ def __init__(self, session, params): class CreateChannel(Cmd): """Command to create channel""" - def __init__(self, parent, name, temporary): + def __init__(self, parent: int, name: str, temporary: bool) -> None: Cmd.__init__(self) self.cmd = PYMUMBLE_MSG_TYPES_CHANNELSTATE - self.parameters = {"parent": parent, - "name": name, - "temporary": temporary} + self.parameters = {"parent": parent, "name": name, "temporary": temporary} + class RemoveChannel(Cmd): """Command to create channel""" - def __init__(self, channel_id): + def __init__(self, channel_id: int) -> None: Cmd.__init__(self) self.cmd = PYMUMBLE_MSG_TYPES_CHANNELREMOVE @@ -86,9 +85,8 @@ def __init__(self, channel_id): class VoiceTarget(Cmd): """Command to create a whisper""" - def __init__(self, voice_id, targets): + def __init__(self, voice_id: int, targets: typing.Sequence[int]) -> None: Cmd.__init__(self) self.cmd = PYMUMBLE_MSG_TYPES_VOICETARGET - self.parameters = {"id": voice_id, - "targets": targets} + self.parameters = {"id": voice_id, "targets": targets} diff --git a/pymumble_py3/mumble.py b/pymumble_py3/mumble.py index af553cf..ffe8ac3 100644 --- a/pymumble_py3/mumble.py +++ b/pymumble_py3/mumble.py @@ -1,23 +1,28 @@ # -*- coding: utf-8 -*- -import threading import logging -import time import select import socket import ssl import struct - -from .errors import * +import threading +import time +import typing + +from . import ( + blobs, + callbacks, + channels, + commands, + messages, + mumble_pb2, + soundoutput, + tools, + users, +) from .constants import * -from . import users -from . import channels -from . import blobs -from . import commands -from . import callbacks -from . import tools -from . import soundoutput +from .errors import * -from . import mumble_pb2 +ProtoMessage = typing.Any class Mumble(threading.Thread): @@ -26,7 +31,18 @@ class Mumble(threading.Thread): basically a thread """ - def __init__(self, host, user, port=64738, password='', certfile=None, keyfile=None, reconnect=False, tokens=[], debug=False): + def __init__( + self, + host: str, + user: str, + port: int = 64738, + password: str = "", + certfile: typing.Optional[str] = None, + keyfile: typing.Optional[str] = None, + reconnect: bool = False, + tokens: typing.List[str] = [], + debug: bool = False, + ) -> None: """ host=mumble server hostname or address port=mumble server port @@ -41,7 +57,9 @@ def __init__(self, host, user, port=64738, password='', certfile=None, keyfile=N # TODO: use UDP audio threading.Thread.__init__(self) - self.Log = logging.getLogger("PyMumble") # logging object for errors and debugging + self.Log = logging.getLogger( + "PyMumble" + ) # logging object for errors and debugging if debug: self.Log.setLevel(logging.DEBUG) else: @@ -49,12 +67,16 @@ def __init__(self, host, user, port=64738, password='', certfile=None, keyfile=N ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) - formatter = logging.Formatter('%(asctime)s-%(name)s-%(levelname)s-%(message)s') + formatter = logging.Formatter("%(asctime)s-%(name)s-%(levelname)s-%(message)s") ch.setFormatter(formatter) self.Log.addHandler(ch) - self.parent_thread = threading.current_thread() # main thread of the calling application - self.mumble_thread = None # thread of the mumble client library + self.parent_thread = ( + threading.current_thread() + ) # main thread of the calling application + self.mumble_thread: typing.Optional[ + threading.Thread + ] = None # thread of the mumble client library self.host = host self.port = port @@ -63,11 +85,19 @@ def __init__(self, host, user, port=64738, password='', certfile=None, keyfile=N self.certfile = certfile self.keyfile = keyfile self.reconnect = reconnect - self.ping_stats = {"last_rcv": 0, "time_send": 0, "nb": 0, "avg": 40.0, "var": 0.0} + self.ping_stats = { + "last_rcv": 0, + "time_send": 0, + "nb": 0, + "avg": 40.0, + "var": 0.0, + } self.tokens = tokens self.__opus_profile = PYMUMBLE_AUDIO_TYPE_OPUS_PROFILE - self.receive_sound = False # set to True to treat incoming audio, otherwise it is simply ignored + self.receive_sound = ( + False # set to True to treat incoming audio, otherwise it is simply ignored + ) self.loop_rate = PYMUMBLE_LOOP_RATE @@ -75,19 +105,23 @@ def __init__(self, host, user, port=64738, password='', certfile=None, keyfile=N self.callbacks = callbacks.CallBacks() # callbacks management - self.ready_lock = threading.Lock() # released when the connection is fully established with the server + self.ready_lock = ( + threading.Lock() + ) # released when the connection is fully established with the server self.ready_lock.acquire() - def init_connection(self): + def init_connection(self) -> None: """Initialize variables that are local to a connection, (needed if the client automatically reconnect)""" - self.ready_lock.acquire(False) # reacquire the ready-lock in case of reconnection + self.ready_lock.acquire( + False + ) # reacquire the ready-lock in case of reconnection - self.connected = PYMUMBLE_CONN_STATE_NOT_CONNECTED - self.control_socket = None + self.connected: int = PYMUMBLE_CONN_STATE_NOT_CONNECTED + self.control_socket: typing.Optional[socket.socket] = None self.media_socket = None # Not implemented - for UDP media self.bandwidth = PYMUMBLE_BANDWIDTH # reset the outgoing bandwidth to it's default before connecting - self.server_max_bandwidth = None + self.server_max_bandwidth: typing.Optional[int] = None self.udp_active = False # defaults according to https://wiki.mumble.info/wiki/Murmur.ini @@ -95,15 +129,26 @@ def init_connection(self): self.server_max_message_length = 5000 self.server_max_image_message_length = 131072 - self.users = users.Users(self, self.callbacks) # contains the server's connected users information - self.channels = channels.Channels(self, self.callbacks) # contains the server's channels information + self.users = users.Users( + self, self.callbacks + ) # contains the server's connected users information + self.channels = channels.Channels( + self, self.callbacks + ) # contains the server's channels information self.blobs = blobs.Blobs(self) # manage the blob objects - self.sound_output = soundoutput.SoundOutput(self, PYMUMBLE_AUDIO_PER_PACKET, self.bandwidth, opus_profile=self.__opus_profile) # manage the outgoing sounds - self.commands = commands.Commands() # manage commands sent between the main and the mumble threads + self.sound_output = soundoutput.SoundOutput( + self, + PYMUMBLE_AUDIO_PER_PACKET, + self.bandwidth, + opus_profile=self.__opus_profile, + ) # manage the outgoing sounds + self.commands = ( + commands.Commands() + ) # manage commands sent between the main and the mumble threads self.receive_buffer = bytes() # initialize the control connection input buffer - def run(self): + def run(self) -> None: """Connect to the server and start the loop in its thread. Retry if requested""" self.mumble_thread = threading.current_thread() @@ -111,9 +156,13 @@ def run(self): while True: self.init_connection() # reset the connection-specific object members - if self.connect() >= PYMUMBLE_CONN_STATE_FAILED: # some error occurred, exit here + if ( + self.connect() >= PYMUMBLE_CONN_STATE_FAILED + ): # some error occurred, exit here self.ready_lock.release() - raise ConnectionRejectedError ("Connection error with the Mumble (murmur) Server") + raise ConnectionRejectedError( + "Connection error with the Mumble (murmur) Server" + ) break try: @@ -126,26 +175,42 @@ def run(self): time.sleep(PYMUMBLE_CONNECTION_RETRY_INTERVAL) - def connect(self): + def connect(self) -> int: """Connect to the server""" - # Get IPv4/IPv6 server address + # Get IPv4/IPv6 server address server_info = socket.getaddrinfo(self.host, self.port, type=socket.SOCK_STREAM) # Connect the SSL tunnel - self.Log.debug("connecting to %s (%s) on port %i.", self.host, server_info[0][1], self.port) + self.Log.debug( + "connecting to %s (%s) on port %i.", self.host, server_info[0][1], self.port + ) std_sock = socket.socket(server_info[0][0], socket.SOCK_STREAM) try: - self.control_socket = ssl.wrap_socket(std_sock, certfile=self.certfile, keyfile=self.keyfile, ssl_version=ssl.PROTOCOL_TLS) + self.control_socket = ssl.wrap_socket( + std_sock, + certfile=self.certfile, + keyfile=self.keyfile, + ssl_version=ssl.PROTOCOL_TLS, + ) except AttributeError: - self.control_socket = ssl.wrap_socket(std_sock, certfile=self.certfile, keyfile=self.keyfile, ssl_version=ssl.PROTOCOL_TLSv1) + self.control_socket = ssl.wrap_socket( + std_sock, + certfile=self.certfile, + keyfile=self.keyfile, + ssl_version=ssl.PROTOCOL_TLSv1, + ) try: self.control_socket.connect((self.host, self.port)) - self.control_socket.setblocking(0) + self.control_socket.setblocking(False) # Perform the Mumble authentication version = mumble_pb2.Version() - version.version = (PYMUMBLE_PROTOCOL_VERSION[0] << 16) + (PYMUMBLE_PROTOCOL_VERSION[1] << 8) + PYMUMBLE_PROTOCOL_VERSION[2] + version.version = ( + (PYMUMBLE_PROTOCOL_VERSION[0] << 16) + + (PYMUMBLE_PROTOCOL_VERSION[1] << 8) + + PYMUMBLE_PROTOCOL_VERSION[2] + ) version.release = self.application version.os = PYMUMBLE_OS_STRING version.os_version = PYMUMBLE_OS_VERSION_STRING @@ -166,7 +231,7 @@ def connect(self): self.connected = PYMUMBLE_CONN_STATE_AUTHENTICATING return self.connected - def loop(self): + def loop(self) -> None: """ Main loop waiting for a message from the server for maximum self.loop_rate time @@ -180,71 +245,95 @@ def loop(self): last_ping = time.time() # keep track of the last ping time # loop as long as the connection and the parent thread are alive - while self.connected not in (PYMUMBLE_CONN_STATE_NOT_CONNECTED, PYMUMBLE_CONN_STATE_FAILED) and self.parent_thread.is_alive(): - if last_ping + PYMUMBLE_PING_DELAY <= time.time(): # when it is time, send the ping + while ( + self.connected + not in (PYMUMBLE_CONN_STATE_NOT_CONNECTED, PYMUMBLE_CONN_STATE_FAILED) + and self.parent_thread.is_alive() + ): + if ( + last_ping + PYMUMBLE_PING_DELAY <= time.time() + ): # when it is time, send the ping self.ping() last_ping = time.time() if self.connected == PYMUMBLE_CONN_STATE_CONNECTED: while self.commands.is_cmd(): - self.treat_command(self.commands.pop_cmd()) # send the commands coming from the application to the server + cmd = typing.cast(messages.Cmd, self.commands.pop_cmd()) + self.treat_command( + cmd + ) # send the commands coming from the application to the server self.sound_output.send_audio() # send outgoing audio if available - (rlist, wlist, xlist) = select.select([self.control_socket], [], [self.control_socket], self.loop_rate) # wait for a socket activity + (rlist, wlist, xlist) = select.select( + [self.control_socket], [], [self.control_socket], self.loop_rate + ) # wait for a socket activity - if self.control_socket in rlist: # something to be read on the control socket + if ( + self.control_socket in rlist + ): # something to be read on the control socket self.read_control_messages() elif self.control_socket in xlist: # socket was closed - self.control_socket.close() + typing.cast(socket.socket, self.control_socket).close() self.connected = PYMUMBLE_CONN_STATE_NOT_CONNECTED - def ping(self): + def ping(self) -> None: """Send the keepalive through available channels""" ping = mumble_pb2.Ping() ping.timestamp = int(time.time()) - ping.tcp_ping_avg = self.ping_stats['avg'] - ping.tcp_ping_var = self.ping_stats['var'] - ping.tcp_packets = self.ping_stats['nb'] + ping.tcp_ping_avg = self.ping_stats["avg"] + ping.tcp_ping_var = self.ping_stats["var"] + ping.tcp_packets = self.ping_stats["nb"] self.Log.debug("sending: ping: %s", ping) self.send_message(PYMUMBLE_MSG_TYPES_PING, ping) - self.ping_stats['time_send'] = int(time.time() * 1000) - self.Log.debug(self.ping_stats['last_rcv']) - if self.ping_stats['last_rcv'] != 0 and int(time.time() * 1000) > self.ping_stats['last_rcv'] + (60 * 1000): + self.ping_stats["time_send"] = int(time.time() * 1000) + self.Log.debug(self.ping_stats["last_rcv"]) + if self.ping_stats["last_rcv"] != 0 and int( + time.time() * 1000 + ) > self.ping_stats["last_rcv"] + (60 * 1000): self.Log.debug("Ping too long ! Disconnected ?") self.connected = PYMUMBLE_CONN_STATE_NOT_CONNECTED - def ping_response(self, mess): - self.ping_stats['last_rcv'] = int(time.time() * 1000) - ping = int(time.time() * 1000) - self.ping_stats['time_send'] - old_avg = self.ping_stats['avg'] - nb = self.ping_stats['nb'] - new_avg = ((self.ping_stats['avg'] * nb) + ping) / (nb + 1) + def ping_response(self, mess: ProtoMessage) -> None: + self.ping_stats["last_rcv"] = int(time.time() * 1000) + ping = int(time.time() * 1000) - self.ping_stats["time_send"] + old_avg = self.ping_stats["avg"] + nb = self.ping_stats["nb"] + new_avg = ((self.ping_stats["avg"] * nb) + ping) / (nb + 1) try: - self.ping_stats['var'] = self.ping_stats['var'] + pow(old_avg - new_avg, 2) + (1 / nb) * pow(ping - new_avg, 2) + self.ping_stats["var"] = ( + self.ping_stats["var"] + + pow(old_avg - new_avg, 2) + + (1 / nb) * pow(ping - new_avg, 2) + ) except ZeroDivisionError: pass - self.ping_stats['avg'] = new_avg - self.ping_stats['nb'] += 1 + self.ping_stats["avg"] = new_avg + self.ping_stats["nb"] += 1 - def send_message(self, type, message): + def send_message(self, type: int, message: ProtoMessage) -> None: """Send a control message to the server""" - packet = struct.pack("!HL", type, message.ByteSize()) + message.SerializeToString() + packet = ( + struct.pack("!HL", type, message.ByteSize()) + message.SerializeToString() + ) - while len(packet) > 0: + while len(packet) > 0 and self.control_socket: self.Log.debug("sending message") sent = self.control_socket.send(packet) if sent < 0: raise socket.error("Server socket error") packet = packet[sent:] - def read_control_messages(self): + def read_control_messages(self) -> None: """Read control messages coming from the server""" # from tools import tohex # for debugging + if not self.control_socket: + return + try: buffer = self.control_socket.recv(PYMUMBLE_READ_BUFFER_SIZE) self.receive_buffer += buffer @@ -265,15 +354,19 @@ def read_control_messages(self): # self.Log.debug("message received : " + tohex(self.receive_buffer[0:size+6])) # for debugging - message = self.receive_buffer[6:size + 6] # get the control message - self.receive_buffer = self.receive_buffer[size + 6:] # remove from the buffer the read part + message = self.receive_buffer[6 : size + 6] # get the control message + self.receive_buffer = self.receive_buffer[ + size + 6 : + ] # remove from the buffer the read part self.dispatch_control_message(type, message) - def dispatch_control_message(self, type, message): + def dispatch_control_message(self, type: int, message: bytes) -> None: """Dispatch control messages based on their type""" self.Log.debug("dispatch control message") - if type == PYMUMBLE_MSG_TYPES_UDPTUNNEL: # audio encapsulated in control message + if ( + type == PYMUMBLE_MSG_TYPES_UDPTUNNEL + ): # audio encapsulated in control message self.sound_received(message) elif type == PYMUMBLE_MSG_TYPES_VERSION: @@ -300,7 +393,9 @@ def dispatch_control_message(self, type, message): self.ready_lock.release() raise ConnectionRejectedError(mess.reason) - elif type == PYMUMBLE_MSG_TYPES_SERVERSYNC: # this message finish the connection process + elif ( + type == PYMUMBLE_MSG_TYPES_SERVERSYNC + ): # this message finish the connection process mess = mumble_pb2.ServerSync() mess.ParseFromString(message) self.Log.debug("message: serversync : %s", mess) @@ -422,27 +517,32 @@ def dispatch_control_message(self, type, message): mess = mumble_pb2.ServerConfig() mess.ParseFromString(message) self.Log.debug("message: ServerConfig : %s", mess) - for line in str(mess).split('\n'): - items = line.split(':') + for line in str(mess).split("\n"): + items = line.split(":") if len(items) != 2: continue - if items[0] == 'allow_html': - self.server_allow_html = items[1].strip() == 'true' - elif items[0] == 'message_length': + if items[0] == "allow_html": + self.server_allow_html = items[1].strip() == "true" + elif items[0] == "message_length": self.server_max_message_length = int(items[1].strip()) - elif items[0] == 'image_message_length': + elif items[0] == "image_message_length": self.server_max_image_message_length = int(items[1].strip()) - def set_bandwidth(self, bandwidth): + def set_bandwidth(self, bandwidth: int) -> None: """Set the total allowed outgoing bandwidth""" - if self.server_max_bandwidth is not None and bandwidth > self.server_max_bandwidth: + if ( + self.server_max_bandwidth is not None + and bandwidth > self.server_max_bandwidth + ): self.bandwidth = self.server_max_bandwidth else: self.bandwidth = bandwidth - self.sound_output.set_bandwidth(self.bandwidth) # communicate the update to the outgoing audio manager + self.sound_output.set_bandwidth( + self.bandwidth + ) # communicate the update to the outgoing audio manager - def sound_received(self, message): + def sound_received(self, message: bytes) -> None: """Manage a received sound message""" # from tools import tohex # for debugging @@ -458,46 +558,78 @@ def sound_received(self, message): return session = tools.VarInt() # decode session id - pos += session.decode(message[pos:pos + 10]) + pos += session.decode(message[pos : pos + 10]) sequence = tools.VarInt() # decode sequence number - pos += sequence.decode(message[pos:pos + 10]) + pos += sequence.decode(message[pos : pos + 10]) - self.Log.debug("audio packet received from %i, sequence %i, type:%i, target:%i, length:%i", session.value, sequence.value, type, target, len(message)) + self.Log.debug( + "audio packet received from %i, sequence %i, type:%i, target:%i, length:%i", + session.value, + sequence.value, + type, + target, + len(message), + ) terminator = False # set to true if it's the last 10 ms audio frame for the packet (used with CELT codec) - while (pos < len(message)) and not terminator: # get the audio frames one by one + while ( + pos < len(message) + ) and not terminator: # get the audio frames one by one if type == PYMUMBLE_AUDIO_TYPE_OPUS: - size = tools.VarInt() # OPUS use varint for the frame length + varsize = tools.VarInt() # OPUS use varint for the frame length - pos += size.decode(message[pos:pos + 10]) - size = size.value + pos += varsize.decode(message[pos : pos + 10]) + size = varsize.value if not (size & 0x2000): # terminator is 0x2000 in the resulting int. terminator = True # should actually always be 0 as OPUS can use variable length audio frames - size &= 0x1fff # isolate the size from the terminator + size &= 0x1FFF # isolate the size from the terminator else: - (header,) = struct.unpack("!B", message[pos]) # CELT length and terminator is encoded in a 1 byte int + (header,) = struct.unpack( + "!B", message[pos : pos + 1] + ) # CELT length and terminator is encoded in a 1 byte int if not (header & 0b10000000): terminator = True size = header & 0b01111111 pos += 1 - self.Log.debug("Audio frame : time:%f, last:%s, size:%i, type:%i, target:%i, pos:%i", time.time(), str(terminator), size, type, target, pos - 1) + self.Log.debug( + "Audio frame : time:%f, last:%s, size:%i, type:%i, target:%i, pos:%i", + time.time(), + str(terminator), + size, + type, + target, + pos - 1, + ) if size > 0 and self.receive_sound: # if audio must be treated try: - newsound = self.users[session.value].sound.add(message[pos:pos + size], - sequence.value, - type, - target) # add the sound to the user's sound queue - - self.callbacks(PYMUMBLE_CLBK_SOUNDRECEIVED, self.users[session.value], newsound) - - sequence.value += int(round(newsound.duration / 1000 * 10)) # add 1 sequence per 10ms of audio - - self.Log.debug("Audio frame : time:%f last:%s, size:%i, uncompressed:%i, type:%i, target:%i", time.time(), str(terminator), size, newsound.size, type, target) + newsound = self.users[session.value].sound.add( + message[pos : pos + size], sequence.value, type, target + ) # add the sound to the user's sound queue + + assert newsound is not None + + self.callbacks( + PYMUMBLE_CLBK_SOUNDRECEIVED, self.users[session.value], newsound + ) + + sequence.value += int( + round(newsound.duration / 1000 * 10) + ) # add 1 sequence per 10ms of audio + + self.Log.debug( + "Audio frame : time:%f last:%s, size:%i, uncompressed:%i, type:%i, target:%i", + time.time(), + str(terminator), + size, + newsound.size, + type, + target, + ) except CodecNotSupportedError as msg: print(msg) except KeyError: # sound received after user removed @@ -510,42 +642,44 @@ def sound_received(self, message): # TODO: get position info - def set_application_string(self, string): + def set_application_string(self, string: str) -> None: """Set the application name, that can be viewed by other clients on the server""" self.application = string - def set_loop_rate(self, rate): + def set_loop_rate(self, rate: float) -> None: """Set the current main loop rate (pause per iteration)""" self.loop_rate = rate - def get_loop_rate(self): + def get_loop_rate(self) -> float: """Get the current main loop rate (pause per iteration)""" return self.loop_rate - def set_codec_profile(self, profile): + def set_codec_profile(self, profile: str) -> None: """set the audio profile""" if profile in ["audio", "voip"]: self.__opus_profile = profile else: raise ValueError("Unknown profile: " + str(profile)) - def get_codec_profile(self): + def get_codec_profile(self) -> str: """return the audio profile string""" return self.__opus_profile - def set_receive_sound(self, value): + def set_receive_sound(self, value: bool) -> None: """Enable or disable the management of incoming sounds""" if value: self.receive_sound = True else: self.receive_sound = False - def is_ready(self): + def is_ready(self) -> None: """Wait for the connection to be fully completed. To be used in the main thread""" self.ready_lock.acquire() self.ready_lock.release() - def execute_command(self, cmd, blocking=True): + def execute_command( + self, cmd: messages.Cmd, blocking: bool = True + ) -> threading.Lock: """Create a command to be sent to the server. To be used in the main thread""" self.is_ready() @@ -559,8 +693,9 @@ def execute_command(self, cmd, blocking=True): # TODO: manage a timeout for blocking commands. Currently, no command actually waits for the server to execute # The result of these commands should actually be checked against incoming server updates - def treat_command(self, cmd): + def treat_command(self, cmd: messages.Cmd) -> None: """Send the awaiting commands to the server. Used in the pymumble thread.""" + assert cmd.parameters is not None if cmd.cmd == PYMUMBLE_CMD_MOVE: userstate = mumble_pb2.UserState() userstate.session = cmd.parameters["session"] @@ -603,12 +738,12 @@ def treat_command(self, cmd): textvoicetarget.id = cmd.parameters["id"] targets = [] if cmd.parameters["id"] == 1: - voicetarget = mumble_pb2.VoiceTarget.Target() + voicetarget = mumble_pb2.VoiceTarget.Target() # type: ignore voicetarget.channel_id = cmd.parameters["targets"][0] targets.append(voicetarget) else: for target in cmd.parameters["targets"]: - voicetarget = mumble_pb2.VoiceTarget.Target() + voicetarget = mumble_pb2.VoiceTarget.Target() # type: ignore voicetarget.session.append(target) targets.append(voicetarget) textvoicetarget.targets.extend(targets) @@ -642,11 +777,12 @@ def treat_command(self, cmd): cmd.response = True self.commands.answer(cmd) - def get_max_message_length(self): + def get_max_message_length(self) -> int: return self.server_max_message_length - def get_max_image_length(self): + def get_max_image_length(self) -> int: return self.server_max_image_message_length - def my_channel(self): + def my_channel(self) -> channels.Channel: + assert self.users.myself is not None return self.channels[self.users.myself["channel_id"]] diff --git a/pymumble_py3/soundoutput.py b/pymumble_py3/soundoutput.py index 1101d6b..1f77953 100644 --- a/pymumble_py3/soundoutput.py +++ b/pymumble_py3/soundoutput.py @@ -1,15 +1,22 @@ # -*- coding: utf-8 -*- -from time import time +import socket import struct import threading -import socket +import typing +from time import time + import opuslib from .constants import * from .errors import CodecNotSupportedError -from .tools import VarInt from .messages import VoiceTarget +from .tools import VarInt + +if typing.TYPE_CHECKING: + from .mumble import Mumble + +ProtoMessage = typing.Any class SoundOutput: @@ -18,7 +25,13 @@ class SoundOutput: The buffering is the responsibility of the caller, any partial sound will be sent without delay """ - def __init__(self, mumble_object, audio_per_packet, bandwidth, opus_profile=PYMUMBLE_AUDIO_TYPE_OPUS_PROFILE): + def __init__( + self, + mumble_object: "Mumble", + audio_per_packet: float, + bandwidth: int, + opus_profile: str = PYMUMBLE_AUDIO_TYPE_OPUS_PROFILE, + ): """ audio_per_packet=packet audio duration in sec bandwidth=maximum total outgoing bandwidth @@ -27,60 +40,97 @@ def __init__(self, mumble_object, audio_per_packet, bandwidth, opus_profile=PYMU self.Log = self.mumble_object.Log - self.pcm = [] + self.pcm: typing.List[bytes] = [] self.lock = threading.Lock() - self.codec = None # codec currently requested by the server - self.encoder = None # codec instance currently used to encode - self.encoder_framesize = None # size of an audio frame for the current codec (OPUS=audio_per_packet, CELT=0.01s) + self.codec: typing.Optional[ + ProtoMessage + ] = None # codec currently requested by the server + self.encoder: typing.Optional[ + opuslib.Encoder + ] = None # codec instance currently used to encode + self.encoder_framesize: typing.Optional[ + float + ] = None # size of an audio frame for the current codec (OPUS=audio_per_packet, CELT=0.01s) self.opus_profile = opus_profile self.set_audio_per_packet(audio_per_packet) self.set_bandwidth(bandwidth) - self.codec_type = None # codec type number to be used in audio packets + self.codec_type: typing.Optional[ + int + ] = None # codec type number to be used in audio packets self.target = 0 # target is not implemented yet, so always 0 - self.sequence_start_time = 0 # time of sequence 1 - self.sequence_last_time = 0 # time of the last emitted packet + self.sequence_start_time = 0.0 # time of sequence 1 + self.sequence_last_time = 0.0 # time of the last emitted packet self.sequence = 0 # current sequence - def send_audio(self): + def send_audio(self) -> None: """send the available audio to the server, taking care of the timing""" - if not self.encoder or len(self.pcm) == 0: # no codec configured or no audio sent - return () + if ( + not self.encoder or len(self.pcm) == 0 + ): # no codec configured or no audio sent + return + + assert self.encoder_framesize is not None + assert self.codec_type is not None + assert self.mumble_object.control_socket is not None - samples = int(self.encoder_framesize * PYMUMBLE_SAMPLERATE * 2) # number of samples in an encoder frame + samples = int( + self.encoder_framesize * PYMUMBLE_SAMPLERATE * 2 + ) # number of samples in an encoder frame - while len(self.pcm) > 0 and self.sequence_last_time + self.audio_per_packet <= time(): # audio to send and time to send it (since last packet) + while ( + len(self.pcm) > 0 + and self.sequence_last_time + self.audio_per_packet <= time() + ): # audio to send and time to send it (since last packet) current_time = time() - if self.sequence_last_time + PYMUMBLE_SEQUENCE_RESET_INTERVAL <= current_time: # waited enough, resetting sequence to 0 + if ( + self.sequence_last_time + PYMUMBLE_SEQUENCE_RESET_INTERVAL + <= current_time + ): # waited enough, resetting sequence to 0 self.sequence = 0 self.sequence_start_time = current_time self.sequence_last_time = current_time - elif self.sequence_last_time + (self.audio_per_packet * 2) <= current_time: # give some slack (2*audio_per_frame) before interrupting a continuous sequence + elif ( + self.sequence_last_time + (self.audio_per_packet * 2) <= current_time + ): # give some slack (2*audio_per_frame) before interrupting a continuous sequence # calculating sequence after a pause - self.sequence = int((current_time - self.sequence_start_time) / PYMUMBLE_SEQUENCE_DURATION) - self.sequence_last_time = self.sequence_start_time + (self.sequence * PYMUMBLE_SEQUENCE_DURATION) + self.sequence = int( + (current_time - self.sequence_start_time) + / PYMUMBLE_SEQUENCE_DURATION + ) + self.sequence_last_time = self.sequence_start_time + ( + self.sequence * PYMUMBLE_SEQUENCE_DURATION + ) else: # continuous sound self.sequence += int(self.audio_per_packet / PYMUMBLE_SEQUENCE_DURATION) - self.sequence_last_time = self.sequence_start_time + (self.sequence * PYMUMBLE_SEQUENCE_DURATION) - - payload = bytearray() # content of the whole packet, without tcptunnel header - audio_encoded = 0 # audio time already in the packet - - while len(self.pcm) > 0 and audio_encoded < self.audio_per_packet: # more audio to be sent and packet not full + self.sequence_last_time = self.sequence_start_time + ( + self.sequence * PYMUMBLE_SEQUENCE_DURATION + ) + + payload = ( + bytearray() + ) # content of the whole packet, without tcptunnel header + audio_encoded = 0.0 # audio time already in the packet + + while ( + len(self.pcm) > 0 and audio_encoded < self.audio_per_packet + ): # more audio to be sent and packet not full self.lock.acquire() to_encode = self.pcm.pop(0) self.lock.release() - if len(to_encode) != samples: # pad to_encode if needed to match sample length - to_encode += b'\x00' * (samples - len(to_encode)) + if ( + len(to_encode) != samples + ): # pad to_encode if needed to match sample length + to_encode += b"\x00" * (samples - len(to_encode)) try: encoded = self.encoder.encode(to_encode, len(to_encode) // 2) except opuslib.exceptions.OpusError: - encoded = b'' + encoded = b"" audio_encoded += self.encoder_framesize @@ -88,25 +138,30 @@ def send_audio(self): if self.codec_type == PYMUMBLE_AUDIO_TYPE_OPUS: frameheader = VarInt(len(encoded)).encode() else: - frameheader = len(encoded) - if audio_encoded < self.audio_per_packet and len(self.pcm) > 0: # if not last frame for the packet, set the terminator bit - frameheader += (1 << 7) - frameheader = struct.pack('!B', frameheader) + encoded_len = len(encoded) + if ( + audio_encoded < self.audio_per_packet and len(self.pcm) > 0 + ): # if not last frame for the packet, set the terminator bit + encoded_len += 1 << 7 + frameheader = bytearray(struct.pack("!B", encoded_len)) payload += frameheader + encoded # add the frame to the packet header = self.codec_type << 5 # encapsulate in audio packet sequence = VarInt(self.sequence).encode() - udppacket = struct.pack('!B', header | self.target) + sequence + payload + udppacket = struct.pack("!B", header | self.target) + sequence + payload - self.Log.debug("audio packet to send: sequence:{sequence}, type:{type}, length:{len}".format( - sequence=self.sequence, - type=self.codec_type, - len=len(udppacket) - )) + self.Log.debug( + "audio packet to send: sequence:{sequence}, type:{type}, length:{len}".format( + sequence=self.sequence, type=self.codec_type, len=len(udppacket) + ) + ) - tcppacket = struct.pack("!HL", PYMUMBLE_MSG_TYPES_UDPTUNNEL, len(udppacket)) + udppacket # encapsulate in tcp tunnel + tcppacket = ( + struct.pack("!HL", PYMUMBLE_MSG_TYPES_UDPTUNNEL, len(udppacket)) + + udppacket + ) # encapsulate in tcp tunnel while len(tcppacket) > 0: sent = self.mumble_object.control_socket.send(tcppacket) @@ -114,48 +169,63 @@ def send_audio(self): raise socket.error("Server socket error") tcppacket = tcppacket[sent:] - def get_audio_per_packet(self): + def get_audio_per_packet(self) -> float: """return the configured length of a audio packet (in ms)""" return self.audio_per_packet - def set_audio_per_packet(self, audio_per_packet): + def set_audio_per_packet(self, audio_per_packet: float) -> None: """set the length of an audio packet (in ms)""" self.audio_per_packet = audio_per_packet self.create_encoder() - def get_bandwidth(self): + def get_bandwidth(self) -> int: """get the configured bandwidth for the audio output""" return self.bandwidth - def set_bandwidth(self, bandwidth): + def set_bandwidth(self, bandwidth: int) -> None: """set the bandwidth for the audio output""" self.bandwidth = bandwidth self._set_bandwidth() - def _set_bandwidth(self): + def _set_bandwidth(self) -> None: """do the calculation of the overhead and configure the actual bitrate for the codec""" if self.encoder: + assert self.encoder_framesize is not None + overhead_per_packet = 20 # IP header in bytes - overhead_per_packet += (3 * int(self.audio_per_packet / self.encoder_framesize)) # overhead per frame + overhead_per_packet += 3 * int( + self.audio_per_packet / self.encoder_framesize + ) # overhead per frame if self.mumble_object.udp_active: overhead_per_packet += 12 # UDP header else: overhead_per_packet += 20 # TCP header overhead_per_packet += 6 # TCPTunnel encapsulation - overhead_per_second = int(overhead_per_packet * 8 / self.audio_per_packet) # in bits + overhead_per_second = int( + overhead_per_packet * 8 / self.audio_per_packet + ) # in bits self.Log.debug( - "Bandwidth is {bandwidth}, downgrading to {bitrate} due to the protocol overhead".format(bandwidth=self.bandwidth, bitrate=self.bandwidth - overhead_per_second)) + "Bandwidth is {bandwidth}, downgrading to {bitrate} due to the protocol overhead".format( + bandwidth=self.bandwidth, + bitrate=self.bandwidth - overhead_per_second, + ) + ) self.encoder.bitrate = self.bandwidth - overhead_per_second - def add_sound(self, pcm): + def add_sound(self, pcm: bytes) -> None: """add sound to be sent (in PCM mono 16 bits signed format)""" if len(pcm) % 2 != 0: # check that the data is align on 16 bits raise Exception("pcm data must be mono 16 bits") - samples = int(self.encoder_framesize * PYMUMBLE_SAMPLERATE * 2) # number of samples in an encoder frame + if self.encoder_framesize is None: + raise Exception("encoder not initialized") + + samples = int( + self.encoder_framesize * PYMUMBLE_SAMPLERATE * 2 + ) # number of samples in an encoder frame self.lock.acquire() if len(self.pcm) and len(self.pcm[-1]) < samples: @@ -164,41 +234,45 @@ def add_sound(self, pcm): else: initial_offset = 0 for i in range(initial_offset, len(pcm), samples): - self.pcm.append(pcm[i:i + samples]) + self.pcm.append(pcm[i : i + samples]) self.lock.release() - def clear_buffer(self): + def clear_buffer(self) -> None: self.lock.acquire() self.pcm = [] self.lock.release() - def get_buffer_size(self): + def get_buffer_size(self) -> float: """return the size of the unsent buffer in sec""" - return sum(len(chunk) for chunk in self.pcm) / 2. / PYMUMBLE_SAMPLERATE + return sum(len(chunk) for chunk in self.pcm) / 2.0 / PYMUMBLE_SAMPLERATE - def set_default_codec(self, codecversion): + def set_default_codec(self, codecversion: ProtoMessage) -> None: """Set the default codec to be used to send packets""" self.codec = codecversion self.create_encoder() - def create_encoder(self): + def create_encoder(self) -> None: """create the encoder instance, and set related constants""" if not self.codec: - return () + return if self.codec.opus: - self.encoder = opuslib.Encoder(PYMUMBLE_SAMPLERATE, 1, self.opus_profile) + self.encoder = opuslib.Encoder( + PYMUMBLE_SAMPLERATE, PYMUMBLE_CHANNELS, self.opus_profile + ) self.encoder_framesize = self.audio_per_packet self.codec_type = PYMUMBLE_AUDIO_TYPE_OPUS else: - raise CodecNotSupportedError('') + raise CodecNotSupportedError("") self._set_bandwidth() - def set_whisper(self, target_id, channel=False): + def set_whisper( + self, target_id: typing.Union[typing.List[int], int], channel: bool = False + ) -> None: if not target_id: return - if type(target_id) is int: + if isinstance(target_id, int): target_id = [target_id] self.target = 2 if channel: @@ -206,7 +280,7 @@ def set_whisper(self, target_id, channel=False): cmd = VoiceTarget(self.target, target_id) self.mumble_object.execute_command(cmd) - def remove_whisper(self): + def remove_whisper(self) -> None: self.target = 0 cmd = VoiceTarget(self.target, []) self.mumble_object.execute_command(cmd) diff --git a/pymumble_py3/soundqueue.py b/pymumble_py3/soundqueue.py index 2510000..e1ffc4c 100644 --- a/pymumble_py3/soundqueue.py +++ b/pymumble_py3/soundqueue.py @@ -1,49 +1,56 @@ # -*- coding: utf-8 -*- import time -from threading import Lock +import typing from collections import deque +from threading import Lock import opuslib from .constants import * +if typing.TYPE_CHECKING: + from .mumble import Mumble + class SoundQueue: """ Per user storage of received audio frames Takes care of the decoding of the received audio """ - def __init__(self, mumble_object): + + def __init__(self, mumble_object: "Mumble"): self.mumble_object = mumble_object - - self.queue = deque() - self.start_sequence = None - self.start_time = None + + self.queue: typing.Deque[SoundChunk] = deque() + self.start_sequence: typing.Optional[int] = None + self.start_time: typing.Optional[float] = None self.receive_sound = True - + self.lock = Lock() - + # to be sure, create every supported decoders for all users # sometime, clients still use a codec for a while after server request another... - self.decoders = { - PYMUMBLE_AUDIO_TYPE_OPUS: opuslib.Decoder(PYMUMBLE_SAMPLERATE, 1) + self.decoders: typing.Dict[int, opuslib.Decoder] = { + PYMUMBLE_AUDIO_TYPE_OPUS: opuslib.Decoder(PYMUMBLE_SAMPLERATE, 1) } - def set_receive_sound(self, value): + def set_receive_sound(self, value: bool) -> None: """Define if received sounds must be kept or discarded in this specific queue (user)""" if value: self.receive_sound = True else: self.receive_sound = False - def add(self, audio, sequence, type, target): + def add( + self, audio: bytes, sequence: int, type: int, target: int + ) -> typing.Optional["SoundChunk"]: """Add a new audio frame to the queue, after decoding""" if not self.receive_sound: return None - + self.lock.acquire() - + try: pcm = self.decoders[type].decode(audio, PYMUMBLE_READ_BUFFER_SIZE) @@ -54,63 +61,90 @@ def add(self, audio, sequence, type, target): calculated_time = self.start_time else: # calculating position in current sequence - calculated_time = self.start_time + (sequence - self.start_sequence) * PYMUMBLE_SEQUENCE_DURATION + calculated_time = ( + typing.cast(float, self.start_time) + + (sequence - self.start_sequence) * PYMUMBLE_SEQUENCE_DURATION + ) - newsound = SoundChunk(pcm, sequence, len(pcm), calculated_time, type, target) + newsound = SoundChunk( + pcm, sequence, len(pcm), calculated_time, type, target + ) self.queue.appendleft(newsound) if len(self.queue) > 1 and self.queue[0].time < self.queue[1].time: # sort the audio chunk if it came out of order cpt = 0 - while cpt < len(self.queue) - 1 and self.queue[cpt].time < self.queue[cpt+1].time: - tmp = self.queue[cpt+1] - self.queue[cpt+1] = self.queue[cpt] + while ( + cpt < len(self.queue) - 1 + and self.queue[cpt].time < self.queue[cpt + 1].time + ): + tmp = self.queue[cpt + 1] + self.queue[cpt + 1] = self.queue[cpt] self.queue[cpt] = tmp self.lock.release() return newsound except KeyError: self.lock.release() - self.mumble_object.Log.error("Codec not supported (audio packet type {0})".format(type)) + self.mumble_object.Log.error( + "Codec not supported (audio packet type {0})".format(type) + ) except Exception as e: self.lock.release() - self.mumble_object.Log.error("error while decoding audio. sequence:{seq}, type:{type}. {error}".format(seq=sequence, type=type, error=str(e))) + self.mumble_object.Log.error( + "error while decoding audio. sequence:{seq}, type:{type}. {error}".format( + seq=sequence, type=type, error=str(e) + ) + ) + return None - def is_sound(self): + def is_sound(self) -> bool: """Boolean to check if there is a sound frame in the queue""" if len(self.queue) > 0: return True else: return False - - def get_sound(self, duration=None): + + def get_sound( + self, duration: typing.Optional[float] = None + ) -> typing.Optional["SoundChunk"]: """Return the first sound of the queue and discard it""" self.lock.acquire() - if len(self.queue) > 0: - if duration is None or self.first_sound().duration <= duration: - result = self.queue.pop() + sound = self.first_sound() + result: typing.Optional["SoundChunk"] = None + if sound is not None: + if duration is None or sound.duration <= duration: + result = self.queue.pop() else: - result = self.first_sound().extract_sound(duration) - else: - result = None + result = sound.extract_sound(duration) self.lock.release() return result - - def first_sound(self): + + def first_sound(self) -> typing.Optional["SoundChunk"]: """Return the first sound of the queue, but keep it""" if len(self.queue) > 0: return self.queue[-1] else: return None - + class SoundChunk: """ - Object that contains the actual audio frame, in PCM format""" - def __init__(self, pcm, sequence, size, calculated_time, type, target, timestamp=time.time()): - self.timestamp = timestamp # measured time of arrival of the sound + Object that contains the actual audio frame, in PCM format""" + + def __init__( + self, + pcm: bytes, + sequence: int, + size: int, + calculated_time: float, + type: int, + target: int, + timestamp: float = time.time(), + ): + self.timestamp = timestamp # measured time of arrival of the sound self.time = calculated_time # calculated time of arrival of the sound (based on sequence) self.pcm = pcm # audio data self.sequence = sequence # sequence of the packet @@ -118,23 +152,23 @@ def __init__(self, pcm, sequence, size, calculated_time, type, target, timestamp self.duration = float(size) / 2 / PYMUMBLE_SAMPLERATE # duration in sec self.type = type # type of the audio (codec) self.target = target # target of the audio - - def extract_sound(self, duration): + + def extract_sound(self, duration: float) -> "SoundChunk": """Extract part of the chunk, leaving a valid chunk for the remaining part""" - size = int(duration*2*PYMUMBLE_SAMPLERATE) + size = int(duration * 2 * PYMUMBLE_SAMPLERATE) result = SoundChunk( - self.pcm[:size], - self.sequence, - size, - self.time, - self.type, - self.target, - self.timestamp - ) - + self.pcm[:size], + self.sequence, + size, + self.time, + self.type, + self.target, + self.timestamp, + ) + self.pcm = self.pcm[size:] self.duration -= duration self.time += duration self.size -= size - + return result diff --git a/pymumble_py3/tools.py b/pymumble_py3/tools.py index 072d050..2b638f1 100644 --- a/pymumble_py3/tools.py +++ b/pymumble_py3/tools.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -import struct import builtins +import struct class InvalidVarInt(Exception): @@ -9,122 +9,120 @@ class InvalidVarInt(Exception): class VarInt: """Implement the varint type used in mumble""" - def __init__(self, value=0): + + def __init__(self, value: int = 0): self.value = value - - def encode(self): + + def encode(self) -> bytearray: """Encode an integer in the VarInt format, returning a binary string""" result = bytearray() value = abs(self.value) - - + if self.value < 0: if self.value >= -3: - return struct.pack("!B", (0b11111100 | value)) + return result + struct.pack("!B", (0b11111100 | value)) else: - result = struct.pack("!B", 0b11111000) - - if value <= 0x7f: - return result + struct.pack("!B", value) - elif value <= 0x3fff: + result += struct.pack("!B", 0b11111000) + + if value <= 0x7F: + return result + struct.pack("!B", value) + elif value <= 0x3FFF: return result + struct.pack("!H", 0x8000 | value) - elif value <= 0x1fffff: - return result + struct.pack("!BH", 0xc0 | (value >> 16), 0xffff & value) - elif value <= 0xfffffff: - return result + struct.pack("!L", 0xe0000000 | value) - elif value <= 0xffffffff: + elif value <= 0x1FFFFF: + return result + struct.pack("!BH", 0xC0 | (value >> 16), 0xFFFF & value) + elif value <= 0xFFFFFFF: + return result + struct.pack("!L", 0xE0000000 | value) + elif value <= 0xFFFFFFFF: return result + struct.pack("!BL", 0b11110000, value) else: return result + struct.pack("!BQ", 0b11110100, value) - def decode(self, value): + def decode(self, value: bytes) -> int: """Decode a VarInt contained in a binary string, returning an integer""" varint = value is_negative = False - result = None + result = 0 size = 0 - + if len(varint) <= 0: raise InvalidVarInt("length can't be 0") - - (first, ) = struct.unpack("!B", varint[0:1]) - + + (first,) = struct.unpack("!B", varint[0:1]) + if first & 0b11111100 == 0b11111000: is_negative = True size += 1 if len(varint) < 2: raise InvalidVarInt("Too short negative varint") varint = varint[1:] - (first, ) = struct.unpack("!B", varint[0:1]) + (first,) = struct.unpack("!B", varint[0:1]) if first & 0b10000000 == 0b00000000: - (result, ) = struct.unpack("!B", varint[0:1]) + (result,) = struct.unpack("!B", varint[0:1]) size += 1 elif first & 0b11111100 == 0b11111100: - (result, ) = struct.unpack("!B", varint[0:1]) + (result,) = struct.unpack("!B", varint[0:1]) result &= 0b00000011 is_negative = True size += 1 elif first & 0b11000000 == 0b10000000: if len(varint) < 2: raise InvalidVarInt("Too short 2 bytes varint") - (result, ) = struct.unpack("!H", varint[:2]) + (result,) = struct.unpack("!H", varint[:2]) result &= 0b0011111111111111 size += 2 elif first & 0b11100000 == 0b11000000: if len(varint) < 3: raise InvalidVarInt("Too short 3 bytes varint") - (result, ) = struct.unpack("!B", varint[0:1]) + (result,) = struct.unpack("!B", varint[0:1]) result &= 0b00011111 - (tmp, ) = struct.unpack("!H", varint[1:3]) + (tmp,) = struct.unpack("!H", varint[1:3]) result = (result << 16) + tmp size += 3 elif first & 0b11110000 == 0b11100000: if len(varint) < 4: raise InvalidVarInt("Too short 4 bytes varint") - (result, ) = struct.unpack("!L", varint[:4]) - result &= 0x0fffffff + (result,) = struct.unpack("!L", varint[:4]) + result &= 0x0FFFFFFF size += 4 elif first & 0b11111100 == 0b11110000: if len(varint) < 5: raise InvalidVarInt("Too short 5 bytes varint") - (result, ) = struct.unpack("!L", varint[1:5]) + (result,) = struct.unpack("!L", varint[1:5]) size += 5 elif first & 0b11111100 == 0b11110100: if len(varint) < 9: raise InvalidVarInt("Too short 9 bytes varint") - (result, ) = struct.unpack("!Q", varint[1:9]) + (result,) = struct.unpack("!Q", varint[1:9]) size += 9 + else: + return size if is_negative: - self.value = - result + self.value = -result else: self.value = result - + return size -def tohex(buffer): +def tohex(buffer: str) -> str: """Used for debugging. Output a sting in hex format""" result = "\n" cpt1 = 0 cpt2 = 0 - + for byte in buffer: result += hex(ord(byte))[2:].zfill(2) cpt1 += 1 - + if cpt1 >= 4: result += " " cpt1 = 0 cpt2 += 1 - + if cpt2 >= 10: result += "\n" cpt2 = 0 - - return result - - - + return result diff --git a/pymumble_py3/users.py b/pymumble_py3/users.py index ba28ed0..3c57fc8 100644 --- a/pymumble_py3/users.py +++ b/pymumble_py3/users.py @@ -1,23 +1,34 @@ # -*- coding: utf-8 -*- -from .constants import * -from .errors import TextTooLongError, ImageTooBigError +import typing from threading import Lock -from . import soundqueue -from . import messages -from . import mumble_pb2 -class Users(dict): +from . import messages, mumble_pb2, soundqueue +from .callbacks import CallBacks +from .constants import * +from .errors import ImageTooBigError, TextTooLongError + +if typing.TYPE_CHECKING: + from .mumble import Mumble + +ProtoMessage = typing.Any + + +class Users(typing.Dict[int, "User"]): """Object that stores and update all connected users""" - def __init__(self, mumble_object, callbacks): + def __init__(self, mumble_object: "Mumble", callbacks: CallBacks): self.mumble_object = mumble_object self.callbacks = callbacks - self.myself = None # user object of the pymumble thread itself - self.myself_session = None # session number of the pymumble thread itself + self.myself: typing.Optional[ + "User" + ] = None # user object of the pymumble thread itself + self.myself_session: typing.Optional[ + int + ] = None # session number of the pymumble thread itself self.lock = Lock() - def update(self, message): + def update(self, message: ProtoMessage) -> None: # type: ignore """Update a user information, based on the incoming message""" self.lock.acquire() @@ -32,7 +43,7 @@ def update(self, message): self.lock.release() - def remove(self, message): + def remove(self, message: ProtoMessage) -> None: """Remove a user object based on server info""" self.lock.acquire() @@ -43,29 +54,31 @@ def remove(self, message): self.lock.release() - def set_myself(self, session): + def set_myself(self, session: int) -> None: """Set the "myself" user""" self.myself_session = session if session in self: self.myself = self[session] - def count(self): + def count(self) -> int: """Return the count of connected users""" return len(self) -class User(dict): +class User(typing.Dict[str, typing.Any]): """Object that store one user""" - def __init__(self, mumble_object, message): + def __init__(self, mumble_object: "Mumble", message: ProtoMessage): self.mumble_object = mumble_object self["session"] = message.session self["channel_id"] = 0 self.update(message) - self.sound = soundqueue.SoundQueue(self.mumble_object) # will hold this user incoming audio + self.sound = soundqueue.SoundQueue( + self.mumble_object + ) # will hold this user incoming audio - def update(self, message): + def update(self, message: ProtoMessage) -> typing.Dict[str, typing.Any]: # type: ignore """Update user state, based on an incoming message""" actions = dict() @@ -90,7 +103,9 @@ def update(self, message): return actions # return a dict, useful for the callback functions - def update_field(self, name, field): + def update_field( + self, name: str, field: typing.Any + ) -> typing.Dict[str, typing.Any]: """Update one state value for a user""" actions = dict() if name not in self or self[name] != field: @@ -99,14 +114,16 @@ def update_field(self, name, field): return actions - def get_property(self, property): + def get_property(self, property: str) -> typing.Any: if property in self: return self[property] else: return None - def mute(self): + def mute(self) -> None: """Mute a user""" + assert self.mumble_object.users.myself_session is not None + params = {"session": self["session"]} if self["session"] == self.mumble_object.users.myself_session: @@ -117,8 +134,10 @@ def mute(self): cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def unmute(self): + def unmute(self) -> None: """Unmute a user""" + assert self.mumble_object.users.myself_session is not None + params = {"session": self["session"]} if self["session"] == self.mumble_object.users.myself_session: @@ -129,8 +148,10 @@ def unmute(self): cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def deafen(self): + def deafen(self) -> None: """Deafen a user""" + assert self.mumble_object.users.myself_session is not None + params = {"session": self["session"]} if self["session"] == self.mumble_object.users.myself_session: @@ -141,8 +162,10 @@ def deafen(self): cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def undeafen(self): + def undeafen(self) -> None: """Undeafen a user""" + assert self.mumble_object.users.myself_session is not None + params = {"session": self["session"]} if self["session"] == self.mumble_object.users.myself_session: @@ -153,63 +176,70 @@ def undeafen(self): cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def suppress(self): + def suppress(self) -> None: """Disable a user""" - params = {"session": self["session"], - "suppress": True} + assert self.mumble_object.users.myself_session is not None + + params = {"session": self["session"], "suppress": True} cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def unsuppress(self): + def unsuppress(self) -> None: """Enable a user""" - params = {"session": self["session"], - "suppress": False} + assert self.mumble_object.users.myself_session is not None + + params = {"session": self["session"], "suppress": False} cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def recording(self): + def recording(self) -> None: """Set the user as recording""" - params = {"session": self["session"], - "recording": True} + assert self.mumble_object.users.myself_session is not None + + params = {"session": self["session"], "recording": True} cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def unrecording(self): + def unrecording(self) -> None: """Set the user as not recording""" - params = {"session": self["session"], - "recording": False} + assert self.mumble_object.users.myself_session is not None + + params = {"session": self["session"], "recording": False} cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def comment(self, comment): + def comment(self, comment: str) -> None: """Set the user comment""" - params = {"session": self["session"], - "comment": comment} + assert self.mumble_object.users.myself_session is not None + + params = {"session": self["session"], "comment": comment} cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def texture(self, texture): + def texture(self, texture: bytes) -> None: """Set the user texture""" - params = {"session": self["session"], - "texture": texture} + assert self.mumble_object.users.myself_session is not None + + params = {"session": self["session"], "texture": texture} cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - - def register(self): + + def register(self) -> None: """Register the user (mostly for myself)""" - params = {"session": self["session"], - "user_id": 0} - + assert self.mumble_object.users.myself_session is not None + + params = {"session": self["session"], "user_id": 0} + cmd = messages.ModUserState(self.mumble_object.users.myself_session, params) self.mumble_object.execute_command(cmd) - def move_in(self, channel_id, token=None): + def move_in(self, channel_id: int, token: typing.Optional[str] = None) -> None: if token: authenticate = mumble_pb2.Authenticate() authenticate.username = self.mumble_object.user @@ -218,13 +248,16 @@ def move_in(self, channel_id, token=None): authenticate.tokens.extend([token]) authenticate.opus = True self.mumble_object.Log.debug("sending: authenticate: %s", authenticate) - self.mumble_object.send_message(PYMUMBLE_MSG_TYPES_AUTHENTICATE, authenticate) + self.mumble_object.send_message( + PYMUMBLE_MSG_TYPES_AUTHENTICATE, authenticate + ) session = self.mumble_object.users.myself_session + assert session is not None cmd = messages.MoveCmd(session, channel_id) self.mumble_object.execute_command(cmd) - def send_text_message(self, message): + def send_text_message(self, message: str) -> None: """Send a text message to the user.""" # TODO: This check should be done inside execute_command() diff --git a/setup.py b/setup.py index d05cf5e..4ff53d7 100644 --- a/setup.py +++ b/setup.py @@ -3,21 +3,23 @@ setup( name="pymumble", description="Python 3 version of pymumble, Mumble library used for multiple uses like making mumble bot.", - version='0.3.1', - author='Robert Hendrickx', - author_email='rober@percu.be', - maintainer='Azlux', - maintainer_email='azlux@outlook.com', - url='https://github.com/azlux/pymumble', - license='GPLv3', - packages=['pymumble_py3'], - download_url='https://github.com/azlux/pymumble/archive/pymumble_py3.zip', - classifiers=['Development Status :: 3 - Alpha', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6',] + version="0.3.1", + author="Robert Hendrickx", + author_email="rober@percu.be", + maintainer="Azlux", + maintainer_email="azlux@outlook.com", + url="https://github.com/azlux/pymumble", + license="GPLv3", + packages=["pymumble_py3"], + download_url="https://github.com/azlux/pymumble/archive/pymumble_py3.zip", + classifiers=[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.3", + "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + ], )