From a28fb72561a3fc261a65691128f829bb7577f6f1 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 23 Feb 2024 15:13:33 +0200 Subject: [PATCH] Add heartbeat() method to NoisyPublisher, fix flake8 issues --- posttroll/publisher.py | 8 +++++--- posttroll/tests/test_pubsub.py | 27 +++++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/posttroll/publisher.py b/posttroll/publisher.py index bdeec28..47fb738 100644 --- a/posttroll/publisher.py +++ b/posttroll/publisher.py @@ -23,7 +23,6 @@ """The publisher module gives high-level tools to publish messages on a port.""" -import os import logging import socket from datetime import datetime, timedelta @@ -106,8 +105,6 @@ def __init__(self, address, name="", min_port=None, max_port=None): self._heartbeat = None self._pub_lock = Lock() - - def start(self): """Start the publisher.""" self.publish_socket = get_context().socket(zmq.PUB) @@ -118,6 +115,7 @@ def start(self): return self def bind(self): + """Bind the port.""" # Check for port 0 (random port) u__ = urlsplit(self.destination) port = u__.port @@ -243,6 +241,10 @@ def close(self): """Alias for stop.""" self.stop() + def heartbeat(self, min_interval=0): + """Publish a heartbeat.""" + self._publisher.heartbeat(min_interval=min_interval) + def _get_publish_address(port, ip_address="*"): return "tcp://" + ip_address + ":" + str(port) diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index 2da79b9..a593bb2 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -307,8 +307,6 @@ def test_pub_unicode(self): def test_pub_minmax_port(self): """Test user defined port range.""" - import os - # Using environment variables to set port range # Try over a range of ports just in case the single port is reserved for port in range(40000, 50000): @@ -720,3 +718,28 @@ def _assert_tcp_keepalive(socket): def _assert_no_tcp_keepalive(socket): assert "TCP_KEEPALIVE" not in str(socket.setsockopt.mock_calls) + + +def test_noisypublisher_heartbeat(): + """Test that the heartbeat in the NoisyPublisher works.""" + from posttroll.ns import NameServer + from posttroll.publisher import NoisyPublisher + from posttroll.subscriber import Subscribe + + ns_ = NameServer() + thr = Thread(target=ns_.run) + thr.start() + + pub = NoisyPublisher("test") + pub.start() + time.sleep(0.2) + + with Subscribe("test", topics="/heartbeat/test", nameserver="localhost") as sub: + time.sleep(0.2) + pub.heartbeat(min_interval=10) + msg = next(sub.recv(1)) + assert msg.type == "beat" + assert msg.data == {'min_interval': 10} + pub.stop() + ns_.stop() + thr.join()