Skip to content

Commit

Permalink
Fixes #1652: Avoid repeated closing of receiver which could hang the …
Browse files Browse the repository at this point in the history
…test
  • Loading branch information
ganeshmurthy committed Oct 25, 2024
1 parent ae9c164 commit ea322ce
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions tests/system_tests_stuck_deliveries.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,30 @@ def __init__(self, sender_host, receiver_host, query_host, addr, dlv_count, stuc
self.receiver_conn = None
self.query_conn = None
self.error = None
self.timer = None
self.poll_timer = None
self.receiver = None
self.reply_receiver = None
self.proxy = None
self.query_sender = None
self.reply_addr = None
self.sender = None
self.link_closed = False
self.n_tx = 0
self.n_rx = 0
self.expected_stuck = 0
self.last_stuck = 0
self.deliveries_stuck = 0

def timeout(self):
self.error = "Timeout Expired - n_tx=%d, n_rx=%d, expected_stuck=%d last_stuck=%d" %\
(self.n_tx, self.n_rx, self.expected_stuck, self.last_stuck)
self.error = "Timeout Expired - n_tx=%d, n_rx=%d, expected_stuck=%d deliveries_stuck=%d" %\
(self.n_tx, self.n_rx, self.expected_stuck, self.deliveries_stuck)
self.sender_conn.close()
self.receiver_conn.close()
self.query_conn.close()
if self.poll_timer:
self.poll_timer.cancel()

def fail(self, error):
def stop_test(self, error):
self.error = error
self.sender_conn.close()
self.receiver_conn.close()
Expand All @@ -188,7 +197,6 @@ def fail(self, error):

def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.poll_timer = None
self.receiver_conn = event.container.connect(self.receiver_host)
self.query_conn = event.container.connect(self.query_host)
self.receiver = event.container.create_receiver(self.receiver_conn, self.addr)
Expand All @@ -209,8 +217,7 @@ def on_link_opened(self, event):
# is caused due to the first test failure, so this fix will
# fix the second failure
self.sender_conn = event.container.connect(self.sender_host)
self.sender = event.container.create_sender(self.sender_conn,
self.addr)
self.sender = event.container.create_sender(self.sender_conn, self.addr)

def on_sendable(self, event):
if event.sender == self.sender:
Expand All @@ -230,19 +237,25 @@ def on_message(self, event):
elif event.receiver == self.reply_receiver:
response = self.proxy.response(event.message)
self.accept(event.delivery)
self.last_stuck = response.results[0].deliveriesStuck
if self.last_stuck == self.expected_stuck:
if self.close_link:
self.receiver.close()
else:
for dlv in self.stuck_dlvs:
self.accept(dlv)
self.stuck_dlvs = []
self.deliveries_stuck = response.results[0].deliveriesStuck
if self.deliveries_stuck == self.expected_stuck:
# The deliveries stuck queried via management equals what we expected.
# The test has passed.
if self.expected_stuck != 0:
if self.close_link:
self.receiver.close()
else:
for dlv in self.stuck_dlvs:
self.accept(dlv)
# Once the deliveries have been accepted, we want to make sure that the number of stuck deliveries
# is back to zero before we quit this test.
if self.expected_stuck > 0:
self.query_stats(0)
else:
self.fail(None)
self.stop_test(None)
else:
# The deliveries stuck queried via management is not what we expected.
# We will keep querying until TIMEOUT seconds to see if our condition comes True.
self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self))

def query_stats(self, expected_stuck):
Expand Down

0 comments on commit ea322ce

Please sign in to comment.