Skip to content

Commit

Permalink
Finish the slot ejection tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-kirienko committed Jul 26, 2023
1 parent 98c0f53 commit 77fdca1
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 46 deletions.
115 changes: 76 additions & 39 deletions libudpard/udpard.c
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,20 @@ typedef struct RxFragment
uint32_t frame_index;
} RxFragment;

static inline void rxFragmentFree(struct UdpardFragment* const head,
struct UdpardMemoryResource* const memory_fragment,
struct UdpardMemoryResource* const memory_payload)
{
struct UdpardFragment* handle = head;
while (handle != NULL)
{
struct UdpardFragment* const next = handle->next;
memFreePayload(memory_payload, handle->origin); // May be NULL, is okay.
memFree(memory_fragment, sizeof(RxFragment), handle);
handle = next;
}
}

/// Internally, the RX pipeline is arranged as follows:
///
/// - There is one port per subscription or an RPC-service listener. Within the port, there are N sessions,
Expand Down Expand Up @@ -943,7 +957,10 @@ static inline void rxSlotEjectFragment(RxFragment* const frag, RxSlotEjectContex
UDPARD_ASSERT((frag != NULL) && (ctx != NULL) && memIsValid(ctx->memory_fragment));
if (frag->tree.base.lr[0] != NULL)
{
rxSlotEjectFragment(((RxFragmentTreeNode*) frag->tree.base.lr[0])->this, ctx);
RxFragment* const child = ((RxFragmentTreeNode*) frag->tree.base.lr[0])->this;
UDPARD_ASSERT(child->frame_index < frag->frame_index);
UDPARD_ASSERT(child->tree.base.up == &frag->tree.base);
rxSlotEjectFragment(child, ctx);
}
const size_t fragment_size = frag->base.view.size;
frag->base.next = NULL; // Default state; may be overwritten.
Expand All @@ -965,17 +982,16 @@ static inline void rxSlotEjectFragment(RxFragment* const frag, RxSlotEjectContex
ctx->offset += fragment_size;
if (frag->tree.base.lr[1] != NULL)
{
rxSlotEjectFragment(((RxFragmentTreeNode*) frag->tree.base.lr[1])->this, ctx);
RxFragment* const child = ((RxFragmentTreeNode*) frag->tree.base.lr[1])->this;
UDPARD_ASSERT(child->frame_index > frag->frame_index);
UDPARD_ASSERT(child->tree.base.up == &frag->tree.base);
rxSlotEjectFragment(child, ctx);
}
// Drop the unneeded fragments and their handles after the sub-tree is fully traversed.
if (!retain)
{
memFree(ctx->memory_fragment, sizeof(RxFragment), frag);
memFreePayload(ctx->memory_payload, frag->base.origin);
frag->base.origin.data = NULL;
frag->base.origin.size = 0;
frag->base.view.data = NULL;
frag->base.view.size = 0;
memFree(ctx->memory_fragment, sizeof(RxFragment), frag);
}
}

Expand All @@ -990,52 +1006,48 @@ static inline void rxSlotEjectFragment(RxFragment* const frag, RxSlotEjectContex
/// After this function is invoked, the tree will be destroyed and cannot be used anymore;
/// hence, in the event of invalid transfer being received (bad CRC), the fragments will have to be freed
/// by traversing the linked list instead of the tree.
static inline int_fast8_t rxSlotEject(RxSlot* const self,
const RxFrame frame,
struct UdpardRxTransfer* const rx_transfer,
const size_t extent,
struct UdpardMemoryResource* const memory_fragment,
struct UdpardMemoryResource* const memory_payload)
///
/// The payload shall contain at least the transfer CRC, so the minimum size is TRANSFER_CRC_SIZE_BYTES.
/// There shall be at least one fragment (because a Cyphal transfer contains at least one frame).
///
/// The return value indicates whether the transfer is valid (CRC is correct).
static inline bool rxSlotEject(size_t* const out_payload_size,
struct UdpardFragment* const out_payload_head,
RxFragmentTreeNode* const fragment_tree,
const size_t received_total_size, // With CRC.
const size_t extent,
struct UdpardMemoryResource* const memory_fragment,
struct UdpardMemoryResource* const memory_payload)
{
int_fast8_t result = 0;
UDPARD_ASSERT((received_total_size >= TRANSFER_CRC_SIZE_BYTES) && (fragment_tree != NULL) &&
(out_payload_size != NULL) && (out_payload_head != NULL));
bool result = false;
RxSlotEjectContext eject_ctx = {
.head = NULL,
.predecessor = NULL,
.crc = TRANSFER_CRC_INITIAL,
.retain_size = smaller(self->payload_size - TRANSFER_CRC_SIZE_BYTES, extent),
.retain_size = smaller(received_total_size - TRANSFER_CRC_SIZE_BYTES, extent),
.offset = 0,
.memory_fragment = memory_fragment,
.memory_payload = memory_payload,
};
rxSlotEjectFragment(self->fragments->this, &eject_ctx);
self->fragments = NULL; // Ejection invalidates the tree.
UDPARD_ASSERT(eject_ctx.offset == self->payload_size);
rxSlotEjectFragment(fragment_tree->this, &eject_ctx);
UDPARD_ASSERT(eject_ctx.offset == received_total_size); // Ensure we have traversed the entire tree.
if (TRANSFER_CRC_RESIDUE_BEFORE_OUTPUT_XOR == eject_ctx.crc)
{
result = 1;
rx_transfer->timestamp_usec = self->ts_usec;
rx_transfer->priority = frame.meta.priority;
rx_transfer->source_node_id = frame.meta.src_node_id;
rx_transfer->transfer_id = frame.meta.transfer_id;
rx_transfer->payload_size = eject_ctx.retain_size;
rx_transfer->payload = (eject_ctx.head != NULL)
? (*eject_ctx.head) // Slice. The derived type fields are not needed.
: (struct UdpardFragment){.next = NULL, .view = {0, NULL}, .origin = {0, NULL}};
result = true;
*out_payload_size = eject_ctx.retain_size;
*out_payload_head = (eject_ctx.head != NULL)
? (*eject_ctx.head) // Slice off the derived type fields as they are not needed.
: (struct UdpardFragment){.next = NULL, .view = {0, NULL}, .origin = {0, NULL}};
// This is the single-frame transfer optimization suggested by Scott: we free the first fragment handle
// early by moving the contents into the rx_transfer structure by value.
// No need to free the payload buffer because it has been transferred to the transfer.
memFree(memory_fragment, sizeof(RxFragment), eject_ctx.head); // May be empty.
}
else // The transfer turned out to be invalid. We have to free the fragments. Can't use the tree anymore.
{
struct UdpardFragment* handle = eject_ctx.head;
while (handle != NULL)
{
struct UdpardFragment* const next = handle->next;
memFreePayload(memory_payload, handle->origin);
memFree(memory_fragment, sizeof(RxFragment), handle);
handle = next;
}
rxFragmentFree(eject_ctx.head, memory_fragment, memory_payload);
}
return result;
}
Expand Down Expand Up @@ -1072,15 +1084,14 @@ static inline int_fast8_t rxSlotUpdate(RxSlot* const self,
goto finish;
}
// SECOND: Insert the fragment into the fragment tree. If it already exists, drop and free the duplicate.
UDPARD_ASSERT((self->max_index <= self->eot_index) && (self->accepted_frames < self->eot_index));
UDPARD_ASSERT((self->max_index <= self->eot_index) && (self->accepted_frames <= self->eot_index));
RxSlotUpdateContext update_ctx = {.frame_index = frame.index,
.accepted = false,
.memory_fragment = memory_fragment};
RxFragmentTreeNode* const frag = (RxFragmentTreeNode*) cavlSearch((struct UdpardTreeNode**) &self->fragments, //
&update_ctx,
&rxSlotFragmentSearch,
&rxSlotFragmentFactory);
UDPARD_ASSERT((self->max_index <= self->eot_index) && (self->accepted_frames <= self->eot_index));
if (frag == NULL)
{
UDPARD_ASSERT(!update_ctx.accepted);
Expand All @@ -1089,6 +1100,7 @@ static inline int_fast8_t rxSlotUpdate(RxSlot* const self,
// No restart because there is hope that there will be enough memory when we receive a duplicate.
goto finish;
}
UDPARD_ASSERT(self->max_index <= self->eot_index);
if (update_ctx.accepted)
{
UDPARD_ASSERT(frag->this->frame_index == frame.index);
Expand All @@ -1099,13 +1111,30 @@ static inline int_fast8_t rxSlotUpdate(RxSlot* const self,
}
// THIRD: Detect transfer completion. If complete, eject the payload from the fragment tree and check its CRC.
UDPARD_ASSERT(self->fragments != NULL);
if (self->accepted_frames == self->eot_index)
if (self->accepted_frames > self->eot_index) // Mind the off-by-one: cardinal vs. ordinal.
{
// This transfer is done but we don't know yet if it's valid. Either way, we need to prepare for the next one.
restart = true;
if (self->payload_size >= TRANSFER_CRC_SIZE_BYTES)
{
result = rxSlotEject(self, frame, rx_transfer, extent, memory_fragment, memory_payload);
// NOLINTNEXTLINE(clang-analyzer-security.insecureAPI.DeprecatedOrUnsafeBufferHandling)
(void) memset(rx_transfer, 0, sizeof(struct UdpardRxTransfer)); // Safety.
rx_transfer->timestamp_usec = self->ts_usec;
rx_transfer->priority = frame.meta.priority;
rx_transfer->source_node_id = frame.meta.src_node_id;
rx_transfer->transfer_id = frame.meta.transfer_id;
//
result = rxSlotEject(&rx_transfer->payload_size,
&rx_transfer->payload,
self->fragments,
self->payload_size,
extent,
memory_fragment,
memory_payload)
? 1
: 0;
// The tree is now unusable and the data is moved into rx_transfer.
self->fragments = NULL;
}
}
finish:
Expand Down Expand Up @@ -1182,6 +1211,14 @@ static inline int_fast8_t rxIfaceUpdate(RxIface* const sel
return result;
}

void udpardFragmentFree(const struct UdpardFragment head,
struct UdpardMemoryResource* const memory_fragment,
struct UdpardMemoryResource* const memory_payload)
{
memFreePayload(memory_payload, head.origin); // May be NULL, is okay.
rxFragmentFree(head.next, memory_fragment, memory_payload); // The head is not heap-allocated so not freed.
}

int8_t udpardRxSubscriptionInit(struct UdpardRxSubscription* const self,
const UdpardPortID subject_id,
const size_t extent,
Expand Down
11 changes: 7 additions & 4 deletions libudpard/udpard.h
Original file line number Diff line number Diff line change
Expand Up @@ -739,13 +739,16 @@ struct UdpardRxTransfer
};

/// This is, essentially, a helper that frees the memory allocated for the payload and its fragment headers
/// using the correct memory resource. The application can do the same thing manually if it has access to the
/// using the correct memory resources. The application can do the same thing manually if it has access to the
/// required context to compute the size, or if the memory resource implementation does not require deallocation size.
///
/// The head of the fragment list is passed by value so it is not freed. This is in line with the UdpardRxTransfer
/// design, where the head is stored by value to reduce indirection in small transfers. We call it Scott's Head.
///
/// If any of the arguments are NULL, the function has no effect.
void udpardRxTransferFree(struct UdpardRxTransfer* const self,
struct UdpardMemoryResource* const memory_fragment,
struct UdpardMemoryResource* const memory_payload);
void udpardFragmentFree(const struct UdpardFragment head,
struct UdpardMemoryResource* const memory_fragment,
struct UdpardMemoryResource* const memory_payload);

// --------------------------------------------- SUBJECTS ---------------------------------------------

Expand Down
2 changes: 2 additions & 0 deletions tests/.idea/dictionaries/pavel.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 77fdca1

Please sign in to comment.