Skip to content

Commit

Permalink
Merge pull request #45 from oresat/ping-output
Browse files Browse the repository at this point in the history
Improve ping script output readability
  • Loading branch information
ThirteenFish authored Oct 27, 2024
2 parents 130a791 + b95f048 commit 42b2897
Showing 1 changed file with 176 additions and 75 deletions.
251 changes: 176 additions & 75 deletions scripts/edl_ping_loop.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,179 @@
#!/usr/bin/env python3
"""Quick shell to manually send EDL commands."""
"""Sends EDL ping commands continuously, tracking responses"""

import os
import socket
import sys
from argparse import ArgumentParser
from threading import Thread
from time import monotonic, sleep, time
from collections import OrderedDict
from dataclasses import dataclass
from time import monotonic
from typing import Generator, Union

sys.path.insert(0, os.path.abspath(".."))

from oresat_c3.protocols.edl_command import EdlCommandCode, EdlCommandRequest
from oresat_c3.protocols.edl_packet import SRC_DEST_ORESAT, EdlPacket

sent = 0
recv = 0
loop = 0
last_ts = {}

class Timeout:
"""Tracks how long to sleep until the start of the next loop iteration.
def send_thread(address: tuple, hmac_key: bytes, seq_num: int, delay: float, verbose: bool):
"""Send ping thread"""
global sent # pylint: disable=W0603
global loop # pylint: disable=W0603
uplink_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
start_time = monotonic()
The idea is we'd like to wait until a specific time but the timer only accepts durations
and may wake up early. This class turns an absolute time (given in loops) into a relative
duration from now and makes it easy to resume the timer if woken up early. Should be created
close (in time) to the start of the loop.
while True:
loop += 1
seq_num += 1
seq_num &= 0xFF_FF_FF_FF
Parameters
----------
delay: duration in seconds of one loop iteration
"""

values = (loop,)
print(f"Request PING: {values} | seq_num: {seq_num}")
def __init__(self, delay: float):
self.start = monotonic()
self.delay = delay

try:
req = EdlCommandRequest(EdlCommandCode.PING, values)
req_packet = EdlPacket(req, seq_num, SRC_DEST_ORESAT)
req_message = req_packet.pack(hmac_key)
def next(self, loop: int) -> Generator[float, None, None]:
"""Generates a monotonically decreasing series of timeouts for the given loop iteration.
if verbose:
print(req_message.hex())

uplink_socket.sendto(req_message, address)
last_ts[loop] = time()
for i in last_ts:
if i > loop + 10:
del last_ts[loop]
sent += 1
except Exception: # pylint: disable=W0718
pass
Parameters
----------
loop: non-negative int giving the current loop iteration
"""

if delay > 0:
sleep(delay - ((monotonic() - start_time) % delay))
while (t := self.delay * loop + self.start - monotonic()) > 0:
yield t

print(f"Sent: {sent} | Recv: {recv} | Return: {100 - ((sent - recv) * 100) // sent}%\n")

class Link:
"""Manages sending and receiving pings, tracking various stats in the process
def main():
"""Loop EDL ping for testing."""
global recv # pylint: disable=W0603
Parameters
----------
host: address to send pings to, either an IP or a hostname
up_port: port on host to send to
down_port: local port to listen on
sequence_number: EDL sequence number to start on
hmac: EDL HMAC key
"""

def __init__(self, host: str, up_port: int, down_port: int, sequence_number: int, hmac: bytes):
self._uplink = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._uplink.connect((host, up_port))

self._downlink = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._downlink.bind((host if host in ["localhost", "127.0.0.1"] else "", down_port))

self.sent = 0
self.echo = 0
self.sequence_number = sequence_number
self.hmac = hmac
self.sent_times: OrderedDict[int, float] = OrderedDict()
self.last = 0

def send(self, value: int) -> bytes:
if not value > self.last:
raise ValueError("value must be monotonically increasing")
self.last = value

self.sequence_number = self.sequence_number + 1 & 0xFFFF_FFFF
request = EdlCommandRequest(EdlCommandCode.PING, (value,))
message = EdlPacket(request, self.sequence_number, SRC_DEST_ORESAT).pack(self.hmac)

self._uplink.send(message)
self.sent_times[value] = monotonic()
self.sent += 1
return message

@dataclass
class Invalid:
"""Return type for an abnormal ping response with value 'payload' and content 'raw'.
This could be caused by abnormal conditions (udp reorder/duplicate, bugs in c3, multiple
c3s responding, ...)
"""

payload: int
raw: bytes

@dataclass
class Lost:
"""Return type indicating that 'count' pings have been dropped"""

count: int

@dataclass
class Recv:
"""Return type for a successful ping response with latency 'delay' and content 'raw'"""

delay: float
raw: bytes

Result = Union[Invalid, Lost, Recv]

def recv(self, timeout: Generator[float, None, None]) -> Generator[Result, None, None]:
for t in timeout:
self._downlink.settimeout(t)
response = self._downlink.recv(4096)
payload = EdlPacket.unpack(response, self.hmac).payload.values[0]
t_recv = monotonic()

# self.sent_times.keys() are monotonic (not to be confused with the timestamps from
# the monotonic() clock) but not necessarily contiguous.
if payload not in self.sent_times:
yield self.Invalid(payload, response)
continue

lost = 0
while self.sent_times:
value, t_sent = self.sent_times.popitem(last=False)
if payload == value:
break
lost += 1

if lost > 0:
yield self.Lost(lost)
self.echo += 1
yield self.Recv(t_recv - t_sent, response)

def rate(self):
return 100 * self.echo // self.sent


def ping_loop(link: Link, timeout: Timeout, count: int, verbose: bool):
print("Loop | Seqn Sent [ Recv (Rate) Latency or Lost×]", end="", flush=True)
loop = 0
while count < 0 or loop < count:
loop += 1
print(f"\n{loop:4} |", end="", flush=True)

try:
message = link.send(loop)
if verbose:
print("\n↖", message.hex())
except ConnectionRefusedError:
print("Connection Refused: Uplink destination not available to receive packets")
continue
print(f"{link.sequence_number:4}# {link.sent:4}↖ ", end="", flush=True)

try:
for result in link.recv(timeout.next(loop)):
print(f"[{link.echo:4}↙ ({link.rate():3}%) ", end="", flush=True)
if isinstance(result, Link.Recv):
print(f"{int(result.delay * 1000):4}ms]", end="", flush=True)
if verbose:
print("\n↙", result.raw.hex())
elif isinstance(result, Link.Lost):
print(f"{result.count:4}× ]", end="", flush=True)
elif isinstance(result, Link.Invalid):
print(f"Unexpected payload {result.payload}, expected {loop}]")
if verbose:
print("\n↙", result.raw.hex())
except socket.timeout:
pass


def main():
parser = ArgumentParser("Send a EDL ping in a loop")
parser.add_argument(
"-o", "--host", default="localhost", help="address to use, default is localhost"
Expand Down Expand Up @@ -96,49 +209,37 @@ def main():
default="",
help="edl hmac, must be 32 bytes, default all zero",
)
parser.add_argument(
"-c",
"--count",
default=-1,
type=int,
help="send up to COUNT pings before stopping. Negative values are forever",
)

args = parser.parse_args()

if args.loop_delay < 0:
print(f"Invalid delay {args.loop_delay}, must be >= 0")
return

if args.hmac:
if len(args.hmac) != 64:
print("Invalid hmac, must be hex string of 32 bytes")
sys.exit(1)
else:
hmac_key = bytes.fromhex(args.hmac)
return
hmac_key = bytes.fromhex(args.hmac)
else:
hmac_key = b"\x00" * 32
hmac_key = bytes(32)

uplink_address = (args.host, args.uplink_port)
link = Link(args.host, args.uplink_port, args.downlink_port, args.sequence_number, hmac_key)
timeout = Timeout(args.loop_delay / 1000)

downlink_host = args.host if args.host in ["localhost", "127.0.0.1"] else ""
downlink_address = (downlink_host, args.downlink_port)
downlink_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
downlink_socket.bind(downlink_address)
downlink_socket.settimeout(1)

t = Thread(
target=send_thread,
args=(uplink_address, hmac_key, args.sequence_number, args.loop_delay / 1000, args.verbose),
daemon=True,
)
t.start()

while True:
try:
res_message = downlink_socket.recv(0xFF_FF)
if args.verbose:
print(res_message.hex())

res_packet = EdlPacket.unpack(res_message, hmac_key)
recv += 1

timediff = -1.0
if loop in last_ts:
timediff = time() - last_ts[loop]
print(f"Response PING: {res_packet.payload.values} | {int(timediff * 1000)} ms")
except KeyboardInterrupt:
break
except Exception: # pylint: disable=W0718
continue
try:
ping_loop(link, timeout, args.count, args.verbose)
except KeyboardInterrupt:
pass
finally:
print("\nNext sequence number:", link.sequence_number)


if __name__ == "__main__":
Expand Down

0 comments on commit 42b2897

Please sign in to comment.