diff --git a/libudpard/udpard.c b/libudpard/udpard.c index b2879a8..c10db61 100644 --- a/libudpard/udpard.c +++ b/libudpard/udpard.c @@ -38,6 +38,7 @@ static const byte_t ByteMask = 0xFFU; #define RX_SLOT_COUNT 2 #define TIMESTAMP_UNSET UINT64_MAX #define FRAME_INDEX_UNSET UINT32_MAX +#define TRANSFER_ID_UNSET UINT64_MAX typedef struct { @@ -834,10 +835,10 @@ static inline void rxFragmentFree(struct UdpardFragment* const head, /// (this is possible because all common CRC functions are linear). typedef struct { - UdpardMicrosecond ts_usec; ///< Timestamp of the earliest frame; TIMESTAMP_UNSET initially. - UdpardTransferID transfer_id; - uint32_t max_index; ///< Maximum observed frame index in this transfer (so far); zero initially. - uint32_t eot_index; ///< Frame index where the EOT flag was observed; FRAME_INDEX_UNSET initially. + UdpardMicrosecond ts_usec; ///< Timestamp of the earliest frame; TIMESTAMP_UNSET upon restart. + UdpardTransferID transfer_id; ///< When first constructed, this shall be set to UINT64_MAX (unreachable value). + uint32_t max_index; ///< Maximum observed frame index in this transfer (so far); zero upon restart. + uint32_t eot_index; ///< Frame index where the EOT flag was observed; FRAME_INDEX_UNSET upon restart. uint32_t accepted_frames; ///< Number of frames accepted so far. size_t payload_size; RxFragmentTreeNode* fragments; @@ -1143,6 +1144,7 @@ static inline int_fast8_t rxSlotAccept(RxSlot* const self, finish: if (restart) { + // Increment is necessary to weed out duplicate transfers. rxSlotRestart(self, self->transfer_id + 1U, memory_fragment, memory_payload); } if (release) @@ -1185,13 +1187,21 @@ static inline int_fast8_t rxIfaceAccept(RxIface* const self, // While we're at it, find the oldest slot as a replacement candidate. for (uint_fast8_t i = 0; i < RX_SLOT_COUNT; i++) { - is_future_tid = is_future_tid && (frame.meta.transfer_id > self->slots[i].transfer_id); - if (self->slots[i].transfer_id < victim->transfer_id) + RxSlot* const it = &self->slots[i]; + is_future_tid = is_future_tid && ((it->transfer_id < frame.meta.transfer_id) || // Keep state if unused. + (it->transfer_id == TRANSFER_ID_UNSET)); + // The timestamp is UNSET when the slot is waiting for the next transfer (or unused -- special case). + // Such slots are the best candidates for replacement because reusing them does not cause loss of + // transfers that are in the process of being reassembled. If there are no such slots, we must + // sacrifice the one whose first frame has arrived the longest time ago. + if ((it->ts_usec == TIMESTAMP_UNSET) || + ((victim->ts_usec != TIMESTAMP_UNSET) && (it->ts_usec < victim->ts_usec))) { - victim = &self->slots[i]; + victim = it; } } - const bool is_tid_timeout = (ts_usec - self->ts_usec) > transfer_id_timeout_usec; + const bool is_tid_timeout = (self->ts_usec == TIMESTAMP_UNSET) || // + ((ts_usec - self->ts_usec) > transfer_id_timeout_usec); if (is_tid_timeout || is_future_tid) { rxSlotRestart(victim, frame.meta.transfer_id, memory_fragment, memory_payload); @@ -1219,6 +1229,7 @@ static inline int_fast8_t rxIfaceAccept(RxIface* const self, memory_payload); if (result > 0) // Transfer successfully received, populate the transfer descriptor for the client. { + self->ts_usec = ts; received_transfer->timestamp_usec = ts; received_transfer->priority = frame.meta.priority; received_transfer->source_node_id = frame.meta.src_node_id; @@ -1243,7 +1254,7 @@ static inline void rxIfaceInit(RxIface* const self, for (uint_fast8_t i = 0; i < RX_SLOT_COUNT; i++) { self->slots[i].fragments = NULL; - rxSlotRestart(&self->slots[i], 0, memory_fragment, memory_payload); + rxSlotRestart(&self->slots[i], TRANSFER_ID_UNSET, memory_fragment, memory_payload); } } diff --git a/tests/src/test_intrusive_rx.c b/tests/src/test_intrusive_rx.c index 0d7c15f..a50b4ee 100644 --- a/tests/src/test_intrusive_rx.c +++ b/tests/src/test_intrusive_rx.c @@ -1018,7 +1018,288 @@ static void testIfaceAcceptA(void) instrumentedAllocatorNew(&mem_payload); RxIface iface; rxIfaceInit(&iface, &mem_fragment.base, &mem_payload.base); - (void) makeRxFrameString; + TEST_ASSERT_EQUAL(TIMESTAMP_UNSET, iface.ts_usec); + for (size_t i = 0; i < RX_SLOT_COUNT; i++) + { + TEST_ASSERT_EQUAL(TIMESTAMP_UNSET, iface.slots[i].ts_usec); + TEST_ASSERT_EQUAL(TRANSFER_ID_UNSET, iface.slots[i].transfer_id); + TEST_ASSERT_EQUAL(0, iface.slots[i].max_index); + TEST_ASSERT_EQUAL(FRAME_INDEX_UNSET, iface.slots[i].eot_index); + TEST_ASSERT_EQUAL(0, iface.slots[i].accepted_frames); + TEST_ASSERT_EQUAL(0, iface.slots[i].payload_size); + TEST_ASSERT_NULL(iface.slots[i].fragments); + } + struct UdpardRxTransfer transfer = {0}; + + // === TRANSFER === + // A simple single-frame transfer successfully accepted. + TEST_ASSERT_EQUAL(1, + rxIfaceAccept(&iface, + 1234567890, + makeRxFrameString(&mem_payload.base, // + (TransferMetadata){.priority = UdpardPriorityHigh, + .src_node_id = 1234, + .dst_node_id = UDPARD_NODE_ID_UNSET, + .data_specifier = 0x1234, + .transfer_id = 0x1122334455667788U}, + 0, + true, + "I am a tomb." + "\x1F\\\xCDs"), + &transfer, + 1000, + UDPARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, + &mem_fragment.base, + &mem_payload.base)); + TEST_ASSERT_EQUAL(1, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(0, mem_fragment.allocated_fragments); // Head fragment is not heap-allocated. + // Check the transfer we just accepted. + TEST_ASSERT_EQUAL(1234567890, transfer.timestamp_usec); + TEST_ASSERT_EQUAL(UdpardPriorityHigh, transfer.priority); + TEST_ASSERT_EQUAL(1234, transfer.source_node_id); + TEST_ASSERT_EQUAL(0x1122334455667788U, transfer.transfer_id); + TEST_ASSERT_EQUAL(12, transfer.payload_size); + TEST_ASSERT(compareStringWithPayload("I am a tomb.", transfer.payload.view)); + udpardFragmentFree(transfer.payload, &mem_fragment.base, &mem_payload.base); + TEST_ASSERT_EQUAL(0, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(0, mem_fragment.allocated_fragments); + // Check the internal states of the iface. + TEST_ASSERT_EQUAL(1234567890, iface.ts_usec); + TEST_ASSERT_EQUAL(TRANSFER_ID_UNSET, iface.slots[0].transfer_id); // Still unused. + TEST_ASSERT_EQUAL(0x1122334455667789U, iface.slots[1].transfer_id); // Incremented. + + // === TRANSFER === + // Send a duplicate and ensure it is rejected. + TEST_ASSERT_EQUAL(0, // No transfer accepted. + rxIfaceAccept(&iface, + 1234567891, // different timestamp but ignored anyway + makeRxFrameString(&mem_payload.base, // + (TransferMetadata){.priority = UdpardPriorityHigh, + .src_node_id = 1234, + .dst_node_id = UDPARD_NODE_ID_UNSET, + .data_specifier = 0x1234, + .transfer_id = 0x1122334455667788U}, + 0, + true, + "I am a tomb." + "\x1F\\\xCDs"), + &transfer, + 1000, + UDPARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, + &mem_fragment.base, + &mem_payload.base)); + TEST_ASSERT_EQUAL(0, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(0, mem_fragment.allocated_fragments); + // Check the internal states of the iface. + TEST_ASSERT_EQUAL(1234567890, iface.ts_usec); // same old timestamp + TEST_ASSERT_EQUAL(TRANSFER_ID_UNSET, iface.slots[0].transfer_id); // Still unused. + TEST_ASSERT_EQUAL(0x1122334455667789U, iface.slots[1].transfer_id); // good ol' transfer id + + // === TRANSFER === + // Send a non-duplicate transfer with an invalid CRC using an in-sequence (matching) transfer-ID. + TEST_ASSERT_EQUAL(0, // No transfer accepted. + rxIfaceAccept(&iface, + 1234567892, // different timestamp but ignored anyway + makeRxFrameString(&mem_payload.base, // + (TransferMetadata){.priority = UdpardPriorityHigh, + .src_node_id = 1234, + .dst_node_id = UDPARD_NODE_ID_UNSET, + .data_specifier = 0x1234, + .transfer_id = 0x1122334455667789U}, + 0, + true, + "I am a tomb." + "No CRC here."), + &transfer, + 1000, + UDPARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, + &mem_fragment.base, + &mem_payload.base)); + TEST_ASSERT_EQUAL(0, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(0, mem_fragment.allocated_fragments); + // Check the internal states of the iface. + TEST_ASSERT_EQUAL(1234567890, iface.ts_usec); // same old timestamp + TEST_ASSERT_EQUAL(TRANSFER_ID_UNSET, iface.slots[0].transfer_id); // Still unused. + TEST_ASSERT_EQUAL(0x112233445566778AU, iface.slots[1].transfer_id); // Incremented. + + // === TRANSFER === + // Send a non-duplicate transfer with an invalid CRC using an out-of-sequence (non-matching) transfer-ID. + // Transfer-ID jumps forward, no existing slot; will use the second one. + TEST_ASSERT_EQUAL(0, // No transfer accepted. + rxIfaceAccept(&iface, + 1234567893, // different timestamp but ignored anyway + makeRxFrameString(&mem_payload.base, // + (TransferMetadata){.priority = UdpardPriorityHigh, + .src_node_id = 1234, + .dst_node_id = UDPARD_NODE_ID_UNSET, + .data_specifier = 0x1234, + .transfer_id = 0x1122334455667790U}, + 0, + true, + "I am a tomb." + "No CRC here, #2."), + &transfer, + 1000, + UDPARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, + &mem_fragment.base, + &mem_payload.base)); + TEST_ASSERT_EQUAL(0, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(0, mem_fragment.allocated_fragments); + // Check the internal states of the iface. + TEST_ASSERT_EQUAL(1234567890, iface.ts_usec); // same old timestamp + TEST_ASSERT_EQUAL(TRANSFER_ID_UNSET, iface.slots[0].transfer_id); // Still unused. + TEST_ASSERT_EQUAL(0x1122334455667791U, iface.slots[1].transfer_id); // Replaced the old one, it was unneeded. + + // === TRANSFER === (x2) + // Send two interleaving multi-frame out-of-order transfers with duplicate frames: + // B1 A2 A0 B0 A1 + TEST_ASSERT_EQUAL(0, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(0, mem_fragment.allocated_fragments); + // B1 + TEST_ASSERT_EQUAL(0, + rxIfaceAccept(&iface, + 2000000010, // Transfer-ID timeout. + makeRxFrameString(&mem_payload.base, // + (TransferMetadata){.priority = UdpardPrioritySlow, + .src_node_id = 2222, + .dst_node_id = UDPARD_NODE_ID_UNSET, + .data_specifier = 0x2222, + .transfer_id = 1001U}, + 1, + true, + "B1" + "g\x8D\x9A\xD7"), + &transfer, + 1000, + UDPARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, + &mem_fragment.base, + &mem_payload.base)); + TEST_ASSERT_EQUAL(1234567890, iface.ts_usec); // same old timestamp + TEST_ASSERT_EQUAL(TRANSFER_ID_UNSET, iface.slots[0].transfer_id); // Still unused. + TEST_ASSERT_EQUAL(1001, iface.slots[1].transfer_id); // Replaced the old one, it was unneeded. + TEST_ASSERT_EQUAL(1, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(1, mem_fragment.allocated_fragments); + // A2 + TEST_ASSERT_EQUAL(0, + rxIfaceAccept(&iface, + 2000000020, + makeRxFrameString(&mem_payload.base, // + (TransferMetadata){.priority = UdpardPriorityHigh, + .src_node_id = 1111, + .dst_node_id = UDPARD_NODE_ID_UNSET, + .data_specifier = 0x1111, + .transfer_id = 1000U}, + 2, + true, + "A2" + "v\x1E\xBD]"), + &transfer, + 1000, + UDPARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, + &mem_fragment.base, + &mem_payload.base)); + TEST_ASSERT_EQUAL(1234567890, iface.ts_usec); // same old timestamp + TEST_ASSERT_EQUAL(1000, iface.slots[0].transfer_id); // Used for A because the other one is taken. + TEST_ASSERT_EQUAL(1001, iface.slots[1].transfer_id); // Keeps B because it is in-progress, can't discard. + TEST_ASSERT_EQUAL(2, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(2, mem_fragment.allocated_fragments); + // A0 + TEST_ASSERT_EQUAL(0, + rxIfaceAccept(&iface, + 2000000030, + makeRxFrameString(&mem_payload.base, // + (TransferMetadata){.priority = UdpardPriorityHigh, + .src_node_id = 1111, + .dst_node_id = UDPARD_NODE_ID_UNSET, + .data_specifier = 0x1111, + .transfer_id = 1000U}, + 0, + false, + "A0"), + &transfer, + 1000, + UDPARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, + &mem_fragment.base, + &mem_payload.base)); + TEST_ASSERT_EQUAL(1234567890, iface.ts_usec); // same old timestamp + TEST_ASSERT_EQUAL(1000, iface.slots[0].transfer_id); + TEST_ASSERT_EQUAL(1001, iface.slots[1].transfer_id); + TEST_ASSERT_EQUAL(3, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(3, mem_fragment.allocated_fragments); + // B0 + TEST_ASSERT_EQUAL(1, + rxIfaceAccept(&iface, + 2000000040, + makeRxFrameString(&mem_payload.base, // + (TransferMetadata){.priority = UdpardPrioritySlow, + .src_node_id = 2222, + .dst_node_id = UDPARD_NODE_ID_UNSET, + .data_specifier = 0x2222, + .transfer_id = 1001U}, + 0, + false, + "B0"), + &transfer, + 1000, + UDPARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, + &mem_fragment.base, + &mem_payload.base)); + // TRANSFER B RECEIVED, check it. + TEST_ASSERT_EQUAL(2000000010, iface.ts_usec); + TEST_ASSERT_EQUAL(1000, iface.slots[0].transfer_id); + TEST_ASSERT_EQUAL(1002, iface.slots[1].transfer_id); // Incremented to meet the next transfer. + TEST_ASSERT_EQUAL(4, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(3, mem_fragment.allocated_fragments); // One fragment freed because of the head optimization. + // Check the payload. + TEST_ASSERT_EQUAL(2000000010, transfer.timestamp_usec); + TEST_ASSERT_EQUAL(UdpardPrioritySlow, transfer.priority); + TEST_ASSERT_EQUAL(2222, transfer.source_node_id); + TEST_ASSERT_EQUAL(1001, transfer.transfer_id); + TEST_ASSERT_EQUAL(4, transfer.payload_size); + TEST_ASSERT(compareStringWithPayload("B0", transfer.payload.view)); + TEST_ASSERT_NOT_NULL(transfer.payload.next); + TEST_ASSERT(compareStringWithPayload("B1", transfer.payload.next->view)); + TEST_ASSERT_NULL(transfer.payload.next->next); + udpardFragmentFree(transfer.payload, &mem_fragment.base, &mem_payload.base); + TEST_ASSERT_EQUAL(2, mem_payload.allocated_fragments); // Only the remaining A0 A2 are left. + TEST_ASSERT_EQUAL(2, mem_fragment.allocated_fragments); + // A1 + TEST_ASSERT_EQUAL(1, + rxIfaceAccept(&iface, + 2000000050, + makeRxFrameString(&mem_payload.base, // + (TransferMetadata){.priority = UdpardPriorityHigh, + .src_node_id = 1111, + .dst_node_id = UDPARD_NODE_ID_UNSET, + .data_specifier = 0x1111, + .transfer_id = 1000U}, + 1, + false, + "A1"), + &transfer, + 1000, + UDPARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, + &mem_fragment.base, + &mem_payload.base)); + // TRANSFER A RECEIVED, check it. + TEST_ASSERT_EQUAL(2000000020, iface.ts_usec); // same old timestamp + TEST_ASSERT_EQUAL(1001, iface.slots[0].transfer_id); // Incremented to meet the next transfer. + TEST_ASSERT_EQUAL(1002, iface.slots[1].transfer_id); + // Check the payload. + TEST_ASSERT_EQUAL(2000000020, transfer.timestamp_usec); + TEST_ASSERT_EQUAL(UdpardPriorityHigh, transfer.priority); + TEST_ASSERT_EQUAL(1111, transfer.source_node_id); + TEST_ASSERT_EQUAL(1000, transfer.transfer_id); + TEST_ASSERT_EQUAL(6, transfer.payload_size); + TEST_ASSERT(compareStringWithPayload("A0", transfer.payload.view)); + TEST_ASSERT_NOT_NULL(transfer.payload.next); + TEST_ASSERT(compareStringWithPayload("A1", transfer.payload.next->view)); + TEST_ASSERT_NOT_NULL(transfer.payload.next->next); + TEST_ASSERT(compareStringWithPayload("A2", transfer.payload.next->next->view)); + TEST_ASSERT_NULL(transfer.payload.next->next->next); + udpardFragmentFree(transfer.payload, &mem_fragment.base, &mem_payload.base); + TEST_ASSERT_EQUAL(0, mem_payload.allocated_fragments); + TEST_ASSERT_EQUAL(0, mem_fragment.allocated_fragments); } void setUp(void) {}