Skip to content

Commit

Permalink
Fix race condition (#80)
Browse files Browse the repository at this point in the history
The broker was able to respond faster than we added the qos1 packet to
the persistence store.

This lead missed acks (`unmatched packet id: ...`) and eventually
stalled the pipeline since the client had too many unacked packets.


Co-authored-by: Kasper Lund <[email protected]>

---------

Co-authored-by: Kasper Lund <[email protected]>
  • Loading branch information
floitsch and kasperl authored Nov 8, 2023
1 parent 7cb455d commit 7bdac1a
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 10 deletions.
63 changes: 55 additions & 8 deletions src/full_client.toit
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,52 @@ class Session_:

options /SessionOptions

static NOT_ACKED_ ::= 0
static ACKED_ ::= 1

/**
Keeps track of ids that haven't been given to the persistence store yet
but are in the process of being sent to the broker.
Once the sending is done without an exception, the id is removed from this
set and given to the persistence store.
Depending on whether it was $ACKED_ or $NOT_ACKED_, the packet is then
immediately removed from the store again.
*/
ack_ids_to_hold_/Map ::= {:}

constructor .options .persistence_store_:

/**
Marks the given packet $id as being in the process of being sent to the broker.
The session holds the acks and doesn't immediately give it to the
persistence store.
*/
hold_ack_for id/int -> none:
ack_ids_to_hold_[id] = NOT_ACKED_

/**
Restores normal ack handling for the given packet $id.
*/
restore_ack_handling_for id/int -> none:
ack_ids_to_hold_.remove id

set_pending_ack topic/string payload/ByteArray --packet_id/int --retain/bool:
persistent := PersistedPacket topic payload --retain=retain --packet_id=packet_id
persistence_store_.add persistent
ack_ids_to_hold_.get packet_id --if_present=: | current_value |
if current_value == ACKED_:
// This packet was already acked.
// Inform the persistence store.
remove_pending packet_id
// Either way, stop treating acks for this id specially.
restore_ack_handling_for packet_id

handle_ack id/int [--if_absent] -> none:
if ack_ids_to_hold_.contains id:
ack_ids_to_hold_[id] = ACKED_
return

if not persistence_store_.remove_persisted_with_id id:
if_absent.call id

Expand Down Expand Up @@ -975,22 +1014,31 @@ class FullClient:
*/
publish topic/string payload/ByteArray --qos/int=1 --retain/bool=false -> none:
if topic == "" or topic.contains "+" or topic.contains "#": throw "INVALID_ARGUMENT"
if qos == 1:
if qos == 0:
send_publish_ topic payload --packet_id=null --qos=qos --retain=retain
else if qos == 1:
ack_received_signal_.wait: session_.pending_count < session_.options.max_inflight
packet_id := send_publish_ topic payload --qos=qos --retain=retain
if qos == 1:
session_.set_pending_ack topic payload --packet_id=packet_id --retain=retain
packet_id := session_.next_packet_id
// We need to tell the session_ to keep acks for this packet id before we
// call 'set_pending_ack'. Otherwise we have a race condition where the
// broker can send the ack before the session is ready for it.
session_.hold_ack_for packet_id
try:
send_publish_ topic payload --packet_id=packet_id --qos=qos --retain=retain
session_.set_pending_ack topic payload --packet_id=packet_id --retain=retain
finally:
// Either 'set_pending_ack' was called, or we had an exception.
// Either way we don't need to do special ack-handling for this packet id anymore.
session_.restore_ack_handling_for packet_id

/**
Sends a $PublishPacket with the given $topic, $payload, $qos and $retain.
If $qos is 1, then allocates a packet id and returns it.
*/
send_publish_ topic/string payload/ByteArray --qos/int --retain/bool -> int?:
send_publish_ topic/string payload/ByteArray --packet_id --qos/int --retain/bool -> none:
if qos != 0 and qos != 1: throw "INVALID_ARGUMENT"

packet_id := qos > 0 ? session_.next_packet_id : null

packet := PublishPacket
topic
payload
Expand All @@ -999,7 +1047,6 @@ class FullClient:
--packet_id=packet_id

send_ packet
return packet_id

/**
Subscribes to the given $topic with a max qos of $max_qos.
Expand Down
105 changes: 105 additions & 0 deletions tests/fast_ack_test.toit
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (C) 2022 Toitware ApS.
// Use of this source code is governed by a Zero-Clause BSD license that can
// be found in the tests/LICENSE file.
import expect show *
import log
import mqtt
import mqtt.transport as mqtt
import mqtt.packets as mqtt
import monitor
import net

import .broker_internal
import .broker_mosquitto
import .transport
import .packet_test_client

/**
Tests that the client can handle acks that arrive fast. That is, where
the client is still in the 'send' routine of the transport.
*/
main args:
test_with_mosquitto := args.contains "--mosquitto"
log_level := log.DEBUG_LEVEL
logger := log.default.with_level log_level

run_test := : | create_transport/Lambda | test create_transport --logger=logger
if test_with_mosquitto: with_mosquitto --logger=logger run_test
else: with_internal_broker --logger=logger run_test

class TestPersistenceStore extends mqtt.MemoryPersistenceStore:
added := {}
removed := {}

add packet/mqtt.PersistedPacket -> none:
added.add packet.packet_id
super packet

remove_persisted_with_id packet_id/int -> bool:
removed.add packet_id
return super packet_id

test create_transport/Lambda --logger/log.Logger:
persistence_store := TestPersistenceStore
id := "persistence_client_id"

return_from_write := monitor.Channel 1
delay_is_active := false
create_test_transport := ::
test_transport := CallbackTestTransport create_transport.call
test_transport.on_after_write = ::
if delay_is_active: return_from_write.receive
test_transport

received_ack := monitor.Channel 1
read_filter := :: | packet/mqtt.Packet |
if packet is mqtt.PubAckPacket:
received_ack.send true
packet

with_packet_client create_test_transport
--client_id = id
--read_filter = read_filter
--persistence_store = persistence_store
--logger=logger: | client/mqtt.FullClient wait_for_idle/Lambda _ _ |

print "wait for idle"
wait_for_idle.call

client.publish "not_delayed" "payload".to_byte_array --qos=1
received_ack.receive
YIELD_COUNT ::= 10
YIELD_COUNT.repeat: yield
expect persistence_store.is_empty
// The ack should have made it to the persistence store.
expect persistence_store.is_empty
expect_equals 1 persistence_store.added.size

delay_is_active = true
publish_is_done := false
task::
client.publish "delayed" "payload".to_byte_array --qos=1
publish_is_done = true
// The delay is *after* we sent the message to the broker.
// We expect to get an ack.
received_ack.receive
// Allow the ack to propagate.
YIELD_COUNT.repeat: yield
// The publish is still in process.
expect_not publish_is_done
// At this point the persistence store hasn't received the packet
// nor the ack yet as the send routine is still running.
expect persistence_store.is_empty
expect_equals 1 persistence_store.added.size

return_from_write.send true
YIELD_COUNT.repeat: yield
// The publish is now done.
expect publish_is_done
// The ack should have made it to the persistence store.
expect persistence_store.is_empty
expect_equals 2 persistence_store.added.size
expect_equals 2 persistence_store.removed.size

delay_is_active = false
10 changes: 8 additions & 2 deletions tests/transport.toit
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,23 @@ class CallbackTestTransport implements mqtt.Transport:
on_reconnect /Lambda? := null
on_disconnect /Lambda? := null
on_write /Lambda? := null
on_after_write /Lambda? := null
on_read /Lambda? := null
on_after_read /Lambda? := null

constructor .wrapped_:

write bytes/ByteArray -> int:
if on_write: on_write.call bytes
return wrapped_.write bytes
result := wrapped_.write bytes
if on_after_write: on_after_write.call bytes result
return result

read -> ByteArray?:
if on_read: return on_read.call wrapped_
return wrapped_.read
result := wrapped_.read
if on_after_read: on_after_read.call result
return result

close -> none: wrapped_.close

Expand Down

0 comments on commit 7bdac1a

Please sign in to comment.