From f8e2a5619ff66b460ffe9a98db58f62f3802c64c Mon Sep 17 00:00:00 2001 From: taras Date: Tue, 8 Oct 2024 23:04:04 +0200 Subject: [PATCH 1/7] WIP: First implementation of measure_ping_pong_latency --- picows/picows.pxd | 4 ++++ picows/picows.pyx | 41 ++++++++++++++++++++++++++++++++++++++++- tests/test_autoping.py | 25 +++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/picows/picows.pxd b/picows/picows.pxd index 59f3a5d..c9e0918 100644 --- a/picows/picows.pxd +++ b/picows/picows.pxd @@ -84,6 +84,8 @@ cdef class WSTransport: readonly bint is_secure bint auto_ping_expect_pong + object pong_received_at_fut + object listener_proxy object _logger #: Logger bint _log_debug_enabled @@ -109,6 +111,8 @@ cdef class WSTransport: cdef class WSListener: + cdef object __weakref__ + cpdef on_ws_connected(self, WSTransport transport) cpdef on_ws_frame(self, WSTransport transport, WSFrame frame) cpdef on_ws_disconnected(self, WSTransport transport) diff --git a/picows/picows.pyx b/picows/picows.pyx index 52b091e..a419026 100644 --- a/picows/picows.pyx +++ b/picows/picows.pyx @@ -1,4 +1,5 @@ import asyncio +import weakref from base64 import b64encode, b64decode import binascii from hashlib import sha1 @@ -393,6 +394,8 @@ cdef class WSTransport: self.is_client_side = is_client_side self.is_secure = underlying_transport.get_extra_info('ssl_object') is not None self.auto_ping_expect_pong = False + self.pong_received_at_fut = None + self.listener_proxy = None self._logger = logger self._log_debug_enabled = self._logger.isEnabledFor(PICOWS_DEBUG_LL) self._disconnected_future = loop.create_future() @@ -571,6 +574,27 @@ cdef class WSTransport: """ await asyncio.shield(self._disconnected_future) + async def measure_ping_pong_latency(self, int rounds): + cdef double ping_at + cdef double pong_at + cdef int i + cdef list results = [] + + # If auto-ping is enabled and currently waiting for pong then + # wait until we receive it and only then proceed with our own pings + if self.auto_ping_expect_pong: + self.pong_received_at_fut = asyncio.get_running_loop().create_future() + await self.pong_received_at_fut + + for i in range(rounds): + self.listener_proxy.send_user_specific_ping(self) + self.pong_received_at_fut = asyncio.get_running_loop().create_future() + ping_at = picows_get_monotonic_time() + pong_at = await self.pong_received_at_fut + results.append(pong_at - ping_at) + + return results + cpdef notify_user_specific_pong_received(self): """ Notify the auto-ping loop that a user-specific pong message @@ -588,6 +612,11 @@ cdef class WSTransport: In such cases, the method simply does nothing. """ self.auto_ping_expect_pong = False + + if self.pong_received_at_fut is not None: + self.pong_received_at_fut.set_result(picows_get_monotonic_time()) + self.pong_received_at_fut = None + if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, "Reset expect_pong flag because notify_user_specific_pong_received() called") @@ -939,6 +968,7 @@ cdef class WSProtocol: # Upgrade response hasn't fully arrived yet return False self.listener = self._listener_factory() + self.transport.listener_proxy = weakref.proxy(self.listener) self._listener_factory = None except Exception as ex: self.transport.disconnect() @@ -960,6 +990,7 @@ cdef class WSProtocol: self._listener_factory = None try: self.listener = listener_factory(upgrade_request) + self.transport.listener_proxy = weakref.proxy(self.listener) except Exception as ex: self.transport._send_internal_server_error(str(ex)) self.transport.disconnect() @@ -1003,6 +1034,12 @@ cdef class WSProtocol: if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, "Send PING because no new data over the last %s seconds", self._auto_ping_idle_timeout) + if self.transport.pong_received_at_fut is not None: + # measure_ping_pong_latency is currently doing it's own ping-pongs + # set _last_data_time to now and sleep + self._last_data_time = picows_get_monotonic_time() + continue + self.listener.send_user_specific_ping(self.transport) self.transport.auto_ping_expect_pong = True @@ -1285,9 +1322,11 @@ cdef class WSProtocol: cdef inline _invoke_on_ws_frame(self, WSFrame frame): try: - if self._enable_auto_ping and self.transport.auto_ping_expect_pong: + if self._enable_auto_ping and self.transport.auto_ping_expect_pong or self.transport.pong_received_at_fut is not None: if self.listener.is_user_specific_pong(frame): self.transport.auto_ping_expect_pong = False + self.transport.pong_received_at_fut.set_result(picows_get_monotonic_time()) + self.transport.pong_received_at_fut = None if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING, reset expect_pong flag") return diff --git a/tests/test_autoping.py b/tests/test_autoping.py index b111cf6..fa8bd42 100644 --- a/tests/test_autoping.py +++ b/tests/test_autoping.py @@ -251,3 +251,28 @@ def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame): assert listener.frames[0].msg_type == picows.WSMsgType.PING assert listener.frames[1].msg_type == picows.WSMsgType.CLOSE assert listener.frames[1].close_code == picows.WSCloseCode.INTERNAL_ERROR + + +async def test_roundtrip_latency(): + class ServerClientListener(picows.WSListener): + def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame): + if frame.msg_type == picows.WSMsgType.PING: + transport.send_pong(frame.get_payload_as_bytes()) + + server = await picows.ws_create_server(lambda _: ServerClientListener(), + "127.0.0.1", 0) + + class ClientListener(picows.WSListener): + async def on_connected(self, transport: picows.WSTransport): + results = await transport.measure_ping_pong_latency(5) + print(results) + transport.disconnect() + + def on_ws_connected(self, transport: picows.WSTransport): + asyncio.get_running_loop().create_task(self.on_connected(transport)) + + async with ServerAsyncContext(server): + url = f"ws://127.0.0.1:{server.sockets[0].getsockname()[1]}" + (transport, listener) = await picows.ws_connect(ClientListener, url, enable_auto_ping=True) + async with async_timeout.timeout(TIMEOUT): + await transport.wait_disconnected() From 3d927d69c4ef4a8bed344770958bf03bcd7b75fd Mon Sep 17 00:00:00 2001 From: taras Date: Thu, 10 Oct 2024 15:59:56 +0200 Subject: [PATCH 2/7] Add tests, improve logging, fix corner cases --- README.rst | 4 ++-- picows/picows.pyx | 27 +++++++++++++++++++-------- tests/test_autoping.py | 41 +++++++++++++++++++++++++++++++---------- tests/test_basics.py | 2 +- 4 files changed, 53 insertions(+), 21 deletions(-) diff --git a/README.rst b/README.rst index e8b67d8..b44b729 100644 --- a/README.rst +++ b/README.rst @@ -183,8 +183,8 @@ Contributing / Building From Source $ python setup.py build_ext --inplace $ pytest -s -v - # Run specific test - $ pytest -s -v -k test_client_handshake_timeout[uvloop-plain] + # Run specific test with picows debug logs enabled + $ pytest -s -v -k test_client_handshake_timeout[uvloop-plain] --log-cli-level 9 5. Run benchmark:: diff --git a/picows/picows.pyx b/picows/picows.pyx index a419026..e104a26 100644 --- a/picows/picows.pyx +++ b/picows/picows.pyx @@ -617,9 +617,14 @@ cdef class WSTransport: self.pong_received_at_fut.set_result(picows_get_monotonic_time()) self.pong_received_at_fut = None - if self._log_debug_enabled: - self._logger.log(PICOWS_DEBUG_LL, - "Reset expect_pong flag because notify_user_specific_pong_received() called") + if self._log_debug_enabled: + self._logger.log(PICOWS_DEBUG_LL, + "notify_user_specific_pong_received() for PONG(measure_ping_pong_latency), reset expect_pong") + else: + if self._log_debug_enabled: + self._logger.log(PICOWS_DEBUG_LL, + "notify_user_specific_pong_received() for PONG(idle timeout), reset expect_pong") + cdef _send_http_handshake(self, bytes ws_path, bytes host_port, bytes websocket_key_b64): initial_handshake = (b"GET %b HTTP/1.1\r\n" @@ -990,7 +995,8 @@ cdef class WSProtocol: self._listener_factory = None try: self.listener = listener_factory(upgrade_request) - self.transport.listener_proxy = weakref.proxy(self.listener) + if self.listener is not None: + self.transport.listener_proxy = weakref.proxy(self.listener) except Exception as ex: self.transport._send_internal_server_error(str(ex)) self.transport.disconnect() @@ -1325,10 +1331,15 @@ cdef class WSProtocol: if self._enable_auto_ping and self.transport.auto_ping_expect_pong or self.transport.pong_received_at_fut is not None: if self.listener.is_user_specific_pong(frame): self.transport.auto_ping_expect_pong = False - self.transport.pong_received_at_fut.set_result(picows_get_monotonic_time()) - self.transport.pong_received_at_fut = None - if self._log_debug_enabled: - self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING, reset expect_pong flag") + if self.transport.pong_received_at_fut is not None: + self.transport.pong_received_at_fut.set_result(picows_get_monotonic_time()) + self.transport.pong_received_at_fut = None + if self._log_debug_enabled: + self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(measure_ping_pong_latency), reset expect_pong flag") + else: + if self._log_debug_enabled: + self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(idle timeout), reset expect_pong flag") + return self.listener.on_ws_frame(self.transport, frame) diff --git a/tests/test_autoping.py b/tests/test_autoping.py index fa8bd42..584bc41 100644 --- a/tests/test_autoping.py +++ b/tests/test_autoping.py @@ -2,9 +2,11 @@ from idlelib.pyparse import trans import async_timeout +import pytest from aiohttp import WSMsgType import picows +from picows import WSFrame from tests.utils import ServerAsyncContext, TIMEOUT, TextFrame, CloseFrame, \ BinaryFrame, materialize_frame @@ -253,7 +255,9 @@ def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame): assert listener.frames[1].close_code == picows.WSCloseCode.INTERNAL_ERROR -async def test_roundtrip_latency(): +@pytest.mark.parametrize("use_notify", [False, True], ids=["dont_use_notify", "use_notify"]) +@pytest.mark.parametrize("with_auto_ping", [False, True], ids=["no_auto_ping", "with_auto_ping"]) +async def test_roundtrip_latency(use_notify, with_auto_ping): class ServerClientListener(picows.WSListener): def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame): if frame.msg_type == picows.WSMsgType.PING: @@ -262,17 +266,34 @@ def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame): server = await picows.ws_create_server(lambda _: ServerClientListener(), "127.0.0.1", 0) - class ClientListener(picows.WSListener): - async def on_connected(self, transport: picows.WSTransport): - results = await transport.measure_ping_pong_latency(5) - print(results) - transport.disconnect() + class ClientListenerUseNotify(picows.WSListener): + def is_user_specific_pong(self, frame): + return False - def on_ws_connected(self, transport: picows.WSTransport): - asyncio.get_running_loop().create_task(self.on_connected(transport)) + def on_ws_frame(self, transport, frame): + if frame.msg_type == picows.WSMsgType.PONG: + transport.notify_user_specific_pong_received() async with ServerAsyncContext(server): url = f"ws://127.0.0.1:{server.sockets[0].getsockname()[1]}" - (transport, listener) = await picows.ws_connect(ClientListener, url, enable_auto_ping=True) - async with async_timeout.timeout(TIMEOUT): + listener_factory = ClientListenerUseNotify if use_notify else picows.WSListener + (transport, listener) = await picows.ws_connect(listener_factory, url, + enable_auto_ping=with_auto_ping, + auto_ping_idle_timeout=0.5, + auto_ping_reply_timeout=0.5) + async with async_timeout.timeout(2): + results = await transport.measure_ping_pong_latency(5) + assert len(results) == 5 + for l in results: + assert l > 0 and l < 1.0 + + await asyncio.sleep(0.7) + + async with async_timeout.timeout(2): + results = await transport.measure_ping_pong_latency(5) + assert len(results) == 5 + for l in results: + assert l > 0 and l < 1.0 + transport.disconnect() + await transport.wait_disconnected() diff --git a/tests/test_basics.py b/tests/test_basics.py index e13f544..cf75807 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -265,7 +265,7 @@ def on_ws_connected(self, transport: picows.WSTransport): await transport.wait_disconnected() -@pytest.mark.parametrize("disconnect_on_exception", [True, False]) +@pytest.mark.parametrize("disconnect_on_exception", [True, False], ids=["disconnect_on_exception", "no_disconnect_on_exception"]) async def test_ws_on_frame_throw(disconnect_on_exception): class ServerClientListener(picows.WSListener): def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame): From a6df8dcaf9a3e327c5a10ce2c1c35cca4e6fc669 Mon Sep 17 00:00:00 2001 From: taras Date: Thu, 10 Oct 2024 16:16:47 +0200 Subject: [PATCH 3/7] Test exception on disconnect in measure_ping_pong_latency --- picows/picows.pyx | 9 +++++++-- tests/test_autoping.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/picows/picows.pyx b/picows/picows.pyx index e104a26..5c47636 100644 --- a/picows/picows.pyx +++ b/picows/picows.pyx @@ -579,18 +579,19 @@ cdef class WSTransport: cdef double pong_at cdef int i cdef list results = [] + cdef object shield = asyncio.shield # If auto-ping is enabled and currently waiting for pong then # wait until we receive it and only then proceed with our own pings if self.auto_ping_expect_pong: self.pong_received_at_fut = asyncio.get_running_loop().create_future() - await self.pong_received_at_fut + await shield(self.pong_received_at_fut) for i in range(rounds): self.listener_proxy.send_user_specific_ping(self) self.pong_received_at_fut = asyncio.get_running_loop().create_future() ping_at = picows_get_monotonic_time() - pong_at = await self.pong_received_at_fut + pong_at = await shield(self.pong_received_at_fut) results.append(pong_at - ping_at) return results @@ -863,6 +864,10 @@ cdef class WSProtocol: if self._auto_ping_loop_task is not None and not self._auto_ping_loop_task.done(): self._auto_ping_loop_task.cancel() + if self.transport.pong_received_at_fut is not None: + self.transport.pong_received_at_fut.set_exception(ConnectionResetError()) + self.transport.pong_received_at_fut = None + self.transport._mark_disconnected() def eof_received(self) -> bool: diff --git a/tests/test_autoping.py b/tests/test_autoping.py index 584bc41..8204f19 100644 --- a/tests/test_autoping.py +++ b/tests/test_autoping.py @@ -297,3 +297,31 @@ def on_ws_frame(self, transport, frame): transport.disconnect() await transport.wait_disconnected() + + +@pytest.mark.parametrize("with_auto_ping", [False, True], ids=["no_auto_ping", "with_auto_ping"]) +async def test_roundtrip_latency_disconnect(with_auto_ping): + class ServerClientListener(picows.WSListener): + def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame): + if frame.msg_type == picows.WSMsgType.PING: + transport.send_pong(frame.get_payload_as_bytes()) + + server = await picows.ws_create_server(lambda _: ServerClientListener(), + "127.0.0.1", 0) + + class ClientListener(picows.WSListener): + def send_user_specific_ping(self, transport): + transport.send_ping() + # Disconnect immediately to test that the client will not hang up + # waiting indefinitely for PONG + transport.disconnect() + + async with ServerAsyncContext(server): + url = f"ws://127.0.0.1:{server.sockets[0].getsockname()[1]}" + (transport, listener) = await picows.ws_connect(ClientListener, url, + enable_auto_ping=with_auto_ping, + auto_ping_idle_timeout=0.5, + auto_ping_reply_timeout=0.5) + async with async_timeout.timeout(TIMEOUT): + with pytest.raises(ConnectionResetError): + await transport.measure_ping_pong_latency(5) From b443683db3da2fe61b3101b2a651d1e17dbccf16 Mon Sep 17 00:00:00 2001 From: taras Date: Thu, 10 Oct 2024 16:22:36 +0200 Subject: [PATCH 4/7] Cleanups --- picows/picows.pxd | 2 +- picows/picows.pyx | 40 ++++++++++++++++++++-------------------- tests/test_autoping.py | 6 +++--- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/picows/picows.pxd b/picows/picows.pxd index c9e0918..57c5043 100644 --- a/picows/picows.pxd +++ b/picows/picows.pxd @@ -84,7 +84,7 @@ cdef class WSTransport: readonly bint is_secure bint auto_ping_expect_pong - object pong_received_at_fut + object pong_received_at_future object listener_proxy object _logger #: Logger diff --git a/picows/picows.pyx b/picows/picows.pyx index 5c47636..ded7a71 100644 --- a/picows/picows.pyx +++ b/picows/picows.pyx @@ -394,7 +394,7 @@ cdef class WSTransport: self.is_client_side = is_client_side self.is_secure = underlying_transport.get_extra_info('ssl_object') is not None self.auto_ping_expect_pong = False - self.pong_received_at_fut = None + self.pong_received_at_future = None self.listener_proxy = None self._logger = logger self._log_debug_enabled = self._logger.isEnabledFor(PICOWS_DEBUG_LL) @@ -574,7 +574,7 @@ cdef class WSTransport: """ await asyncio.shield(self._disconnected_future) - async def measure_ping_pong_latency(self, int rounds): + async def measure_roundtrip_latency(self, int rounds): cdef double ping_at cdef double pong_at cdef int i @@ -584,14 +584,14 @@ cdef class WSTransport: # If auto-ping is enabled and currently waiting for pong then # wait until we receive it and only then proceed with our own pings if self.auto_ping_expect_pong: - self.pong_received_at_fut = asyncio.get_running_loop().create_future() - await shield(self.pong_received_at_fut) + self.pong_received_at_future = asyncio.get_running_loop().create_future() + await shield(self.pong_received_at_future) for i in range(rounds): self.listener_proxy.send_user_specific_ping(self) - self.pong_received_at_fut = asyncio.get_running_loop().create_future() + self.pong_received_at_future = asyncio.get_running_loop().create_future() ping_at = picows_get_monotonic_time() - pong_at = await shield(self.pong_received_at_fut) + pong_at = await shield(self.pong_received_at_future) results.append(pong_at - ping_at) return results @@ -614,13 +614,13 @@ cdef class WSTransport: """ self.auto_ping_expect_pong = False - if self.pong_received_at_fut is not None: - self.pong_received_at_fut.set_result(picows_get_monotonic_time()) - self.pong_received_at_fut = None + if self.pong_received_at_future is not None: + self.pong_received_at_future.set_result(picows_get_monotonic_time()) + self.pong_received_at_future = None if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, - "notify_user_specific_pong_received() for PONG(measure_ping_pong_latency), reset expect_pong") + "notify_user_specific_pong_received() for PONG(measure_roundtrip_latency), reset expect_pong") else: if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, @@ -864,9 +864,9 @@ cdef class WSProtocol: if self._auto_ping_loop_task is not None and not self._auto_ping_loop_task.done(): self._auto_ping_loop_task.cancel() - if self.transport.pong_received_at_fut is not None: - self.transport.pong_received_at_fut.set_exception(ConnectionResetError()) - self.transport.pong_received_at_fut = None + if self.transport.pong_received_at_future is not None: + self.transport.pong_received_at_future.set_exception(ConnectionResetError()) + self.transport.pong_received_at_future = None self.transport._mark_disconnected() @@ -1045,8 +1045,8 @@ cdef class WSProtocol: if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, "Send PING because no new data over the last %s seconds", self._auto_ping_idle_timeout) - if self.transport.pong_received_at_fut is not None: - # measure_ping_pong_latency is currently doing it's own ping-pongs + if self.transport.pong_received_at_future is not None: + # measure_roundtrip_latency is currently doing it's own ping-pongs # set _last_data_time to now and sleep self._last_data_time = picows_get_monotonic_time() continue @@ -1333,14 +1333,14 @@ cdef class WSProtocol: cdef inline _invoke_on_ws_frame(self, WSFrame frame): try: - if self._enable_auto_ping and self.transport.auto_ping_expect_pong or self.transport.pong_received_at_fut is not None: + if self._enable_auto_ping and self.transport.auto_ping_expect_pong or self.transport.pong_received_at_future is not None: if self.listener.is_user_specific_pong(frame): self.transport.auto_ping_expect_pong = False - if self.transport.pong_received_at_fut is not None: - self.transport.pong_received_at_fut.set_result(picows_get_monotonic_time()) - self.transport.pong_received_at_fut = None + if self.transport.pong_received_at_future is not None: + self.transport.pong_received_at_future.set_result(picows_get_monotonic_time()) + self.transport.pong_received_at_future = None if self._log_debug_enabled: - self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(measure_ping_pong_latency), reset expect_pong flag") + self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(measure_roundtrip_latency), reset expect_pong flag") else: if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(idle timeout), reset expect_pong flag") diff --git a/tests/test_autoping.py b/tests/test_autoping.py index 8204f19..6d94d8e 100644 --- a/tests/test_autoping.py +++ b/tests/test_autoping.py @@ -282,7 +282,7 @@ def on_ws_frame(self, transport, frame): auto_ping_idle_timeout=0.5, auto_ping_reply_timeout=0.5) async with async_timeout.timeout(2): - results = await transport.measure_ping_pong_latency(5) + results = await transport.measure_roundtrip_latency(5) assert len(results) == 5 for l in results: assert l > 0 and l < 1.0 @@ -290,7 +290,7 @@ def on_ws_frame(self, transport, frame): await asyncio.sleep(0.7) async with async_timeout.timeout(2): - results = await transport.measure_ping_pong_latency(5) + results = await transport.measure_roundtrip_latency(5) assert len(results) == 5 for l in results: assert l > 0 and l < 1.0 @@ -324,4 +324,4 @@ def send_user_specific_ping(self, transport): auto_ping_reply_timeout=0.5) async with async_timeout.timeout(TIMEOUT): with pytest.raises(ConnectionResetError): - await transport.measure_ping_pong_latency(5) + await transport.measure_roundtrip_latency(5) From 6bef39d3c9e798be03c2a0e1651f2bc116e7c9ee Mon Sep 17 00:00:00 2001 From: taras Date: Thu, 10 Oct 2024 16:56:07 +0200 Subject: [PATCH 5/7] Cleanups, add docs --- picows/picows.pyx | 22 +++++++++++++++------- tests/test_autoping.py | 8 ++++---- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/picows/picows.pyx b/picows/picows.pyx index ded7a71..0338d5c 100644 --- a/picows/picows.pyx +++ b/picows/picows.pyx @@ -9,7 +9,7 @@ import socket import struct import urllib.parse from ssl import SSLContext -from typing import cast, Tuple, Optional, Callable +from typing import cast, Tuple, Optional, Callable, List from multidict import CIMultiDict @@ -574,22 +574,30 @@ cdef class WSTransport: """ await asyncio.shield(self._disconnected_future) - async def measure_roundtrip_latency(self, int rounds): + async def measure_roundtrip_time(self, int rounds) -> List[float]: + """ + Coroutine that measures roundtrip time by running ping-pong. + + :param rounds: how many ping-pong rounds to do + :return: List of measured roundtrip times + """ + cdef double ping_at cdef double pong_at cdef int i cdef list results = [] cdef object shield = asyncio.shield + cdef object create_future = asyncio.get_running_loop().create_future # If auto-ping is enabled and currently waiting for pong then # wait until we receive it and only then proceed with our own pings if self.auto_ping_expect_pong: - self.pong_received_at_future = asyncio.get_running_loop().create_future() + self.pong_received_at_future = create_future() await shield(self.pong_received_at_future) for i in range(rounds): self.listener_proxy.send_user_specific_ping(self) - self.pong_received_at_future = asyncio.get_running_loop().create_future() + self.pong_received_at_future = create_future() ping_at = picows_get_monotonic_time() pong_at = await shield(self.pong_received_at_future) results.append(pong_at - ping_at) @@ -620,7 +628,7 @@ cdef class WSTransport: if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, - "notify_user_specific_pong_received() for PONG(measure_roundtrip_latency), reset expect_pong") + "notify_user_specific_pong_received() for PONG(measure_roundtrip_time), reset expect_pong") else: if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, @@ -1046,7 +1054,7 @@ cdef class WSProtocol: self._logger.log(PICOWS_DEBUG_LL, "Send PING because no new data over the last %s seconds", self._auto_ping_idle_timeout) if self.transport.pong_received_at_future is not None: - # measure_roundtrip_latency is currently doing it's own ping-pongs + # measure_roundtrip_time is currently doing it's own ping-pongs # set _last_data_time to now and sleep self._last_data_time = picows_get_monotonic_time() continue @@ -1340,7 +1348,7 @@ cdef class WSProtocol: self.transport.pong_received_at_future.set_result(picows_get_monotonic_time()) self.transport.pong_received_at_future = None if self._log_debug_enabled: - self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(measure_roundtrip_latency), reset expect_pong flag") + self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(measure_roundtrip_time), reset expect_pong flag") else: if self._log_debug_enabled: self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(idle timeout), reset expect_pong flag") diff --git a/tests/test_autoping.py b/tests/test_autoping.py index 6d94d8e..bfde7eb 100644 --- a/tests/test_autoping.py +++ b/tests/test_autoping.py @@ -257,7 +257,7 @@ def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame): @pytest.mark.parametrize("use_notify", [False, True], ids=["dont_use_notify", "use_notify"]) @pytest.mark.parametrize("with_auto_ping", [False, True], ids=["no_auto_ping", "with_auto_ping"]) -async def test_roundtrip_latency(use_notify, with_auto_ping): +async def test_roundtrip_time(use_notify, with_auto_ping): class ServerClientListener(picows.WSListener): def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame): if frame.msg_type == picows.WSMsgType.PING: @@ -282,7 +282,7 @@ def on_ws_frame(self, transport, frame): auto_ping_idle_timeout=0.5, auto_ping_reply_timeout=0.5) async with async_timeout.timeout(2): - results = await transport.measure_roundtrip_latency(5) + results = await transport.measure_roundtrip_time(5) assert len(results) == 5 for l in results: assert l > 0 and l < 1.0 @@ -290,7 +290,7 @@ def on_ws_frame(self, transport, frame): await asyncio.sleep(0.7) async with async_timeout.timeout(2): - results = await transport.measure_roundtrip_latency(5) + results = await transport.measure_roundtrip_time(5) assert len(results) == 5 for l in results: assert l > 0 and l < 1.0 @@ -324,4 +324,4 @@ def send_user_specific_ping(self, transport): auto_ping_reply_timeout=0.5) async with async_timeout.timeout(TIMEOUT): with pytest.raises(ConnectionResetError): - await transport.measure_roundtrip_latency(5) + await transport.measure_roundtrip_time(5) From ad87cd0aac4d5ce6eb3c6b8d67072462d09e55da Mon Sep 17 00:00:00 2001 From: taras Date: Thu, 10 Oct 2024 17:10:36 +0200 Subject: [PATCH 6/7] Add an example how to test okx rtt --- README.rst | 1 + examples/okx_roundtrip_time.py | 38 ++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 examples/okx_roundtrip_time.py diff --git a/README.rst b/README.rst index b44b729..73c56d1 100644 --- a/README.rst +++ b/README.rst @@ -152,6 +152,7 @@ Features * Provide Cython .pxd for efficient integration of user Cythonized code with picows * Ability to check if a frame is the last one in the receiving buffer * Auto ping-pong with an option to customize ping/pong messages. +* Convenient method to measure websocket roundtrip trip time using ping/pong messages. Contributing / Building From Source =================================== diff --git a/examples/okx_roundtrip_time.py b/examples/okx_roundtrip_time.py new file mode 100644 index 0000000..38959f6 --- /dev/null +++ b/examples/okx_roundtrip_time.py @@ -0,0 +1,38 @@ +import asyncio +import logging + +import picows +from picows import ws_connect, WSFrame, WSTransport, WSListener, WSMsgType, WSCloseCode + +EXPECTED_OKX_ROUNDTRIP_TIME = 0.1 + +class ClientListener(WSListener): + async def check_okx_roundtrip_time(self, transport: picows.WSTransport): + rtts = await transport.measure_roundtrip_time(5) + if min(rtts) < EXPECTED_OKX_ROUNDTRIP_TIME: + print(f"Minimal rtt {min(rtts):.3f} satisfies required {EXPECTED_OKX_ROUNDTRIP_TIME:.3f}") + else: + print(f"Minimal rtt {min(rtts):.3f} DOES NOT satisfies required {EXPECTED_OKX_ROUNDTRIP_TIME:.3f}, disconnect", + min(rtts), EXPECTED_OKX_ROUNDTRIP_TIME) + transport.disconnect() + + def send_user_specific_ping(self, transport: picows.WSTransport): + transport.send(picows.WSMsgType.TEXT, b"ping") + + def is_user_specific_pong(self, frame: picows.WSFrame): + return frame.msg_type == picows.WSMsgType.TEXT and frame.get_payload_as_memoryview() == b"pong" + + def on_ws_connected(self, transport: WSTransport): + asyncio.get_running_loop().create_task(self.check_okx_roundtrip_time(transport)) + + +async def main(url): + while True: + (transport, client) = await ws_connect(ClientListener, url) + await transport.wait_disconnected() + await asyncio.sleep(5) + + +if __name__ == '__main__': + logging.basicConfig(level=9) + asyncio.run(main("wss://ws.okx.com:8443/ws/v5/public")) From 1ad315d00f3130b0dc97ee19030b31ebbb1ead7e Mon Sep 17 00:00:00 2001 From: taras Date: Thu, 10 Oct 2024 17:22:05 +0200 Subject: [PATCH 7/7] Added docs --- docs/source/guides.rst | 10 ++++++++++ picows/picows.pyx | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/source/guides.rst b/docs/source/guides.rst index 83b0334..66ad929 100644 --- a/docs/source/guides.rst +++ b/docs/source/guides.rst @@ -87,6 +87,16 @@ it does not handle replying to incoming ``PING`` frames. ... +Measuring/checking round-trip time +---------------------------------- +**picows** allows to conveniently measure round-trip time to a remote peer using +:any:`measure_roundtrip_time`. This is done by sending PING request multiple +times and measuring response delay. + +Checkout an `example `_ +of how to measure RTT to a popular OKX crypto-currency exchange and initiate +reconnect if it doesn't satisfy a predefined threshold. + Message fragmentation --------------------- In the WebSocket protocol, there is a distinction between messages and frames. diff --git a/picows/picows.pyx b/picows/picows.pyx index 0338d5c..26fab89 100644 --- a/picows/picows.pyx +++ b/picows/picows.pyx @@ -579,7 +579,7 @@ cdef class WSTransport: Coroutine that measures roundtrip time by running ping-pong. :param rounds: how many ping-pong rounds to do - :return: List of measured roundtrip times + :return: list of measured roundtrip times """ cdef double ping_at