Skip to content

Commit

Permalink
Slight internal refactoring to uphold orthogonality
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-kirienko committed Jul 27, 2023
1 parent f9fe930 commit da7c3c1
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 77 deletions.
93 changes: 55 additions & 38 deletions libudpard/udpard.c
Original file line number Diff line number Diff line change
Expand Up @@ -672,13 +672,20 @@ void udpardTxFree(struct UdpardMemoryResource* const memory, struct UdpardTxItem
// ================================================= RX PIPELINE =================================================
// =====================================================================================================================

/// All but the transfer metadata.
typedef struct
{
TransferMetadata meta;
uint32_t index;
bool end_of_transfer;
struct UdpardPayload payload; ///< Also contains the transfer CRC (but not the header CRC).
struct UdpardMutablePayload origin; ///< The entirety of the free-able buffer passed from the application.
} RxFrameBase;

/// Full frame state.
typedef struct
{
RxFrameBase base;
TransferMetadata meta;
} RxFrame;

/// The primitive deserialization functions are endian-agnostic.
Expand Down Expand Up @@ -723,8 +730,8 @@ static inline const byte_t* txDeserializeU64(const byte_t* const source_buffer,
static inline bool rxParseFrame(const struct UdpardMutablePayload datagram_payload, RxFrame* const out)
{
UDPARD_ASSERT((out != NULL) && (datagram_payload.data != NULL));
out->origin = datagram_payload;
bool ok = false;
out->base.origin = datagram_payload;
bool ok = false;
if (datagram_payload.size > 0) // HEADER_SIZE_BYTES may change in the future depending on the header version.
{
const byte_t* ptr = (const byte_t*) datagram_payload.data;
Expand All @@ -736,33 +743,33 @@ static inline bool rxParseFrame(const struct UdpardMutablePayload datagram_paylo
const uint_fast8_t prio = *ptr++;
if (prio <= UDPARD_PRIORITY_MAX)
{
out->meta.priority = (enum UdpardPriority) prio;
ptr = txDeserializeU16(ptr, &out->meta.src_node_id);
ptr = txDeserializeU16(ptr, &out->meta.dst_node_id);
ptr = txDeserializeU16(ptr, &out->meta.data_specifier);
ptr = txDeserializeU64(ptr, &out->meta.transfer_id);
uint32_t index_eot = 0;
ptr = txDeserializeU32(ptr, &index_eot);
out->index = (uint32_t) (index_eot & HEADER_FRAME_INDEX_MASK);
out->end_of_transfer = (index_eot & HEADER_FRAME_INDEX_EOT_MASK) != 0U;
out->meta.priority = (enum UdpardPriority) prio;
ptr = txDeserializeU16(ptr, &out->meta.src_node_id);
ptr = txDeserializeU16(ptr, &out->meta.dst_node_id);
ptr = txDeserializeU16(ptr, &out->meta.data_specifier);
ptr = txDeserializeU64(ptr, &out->meta.transfer_id);
uint32_t index_eot = 0;
ptr = txDeserializeU32(ptr, &index_eot);
out->base.index = (uint32_t) (index_eot & HEADER_FRAME_INDEX_MASK);
out->base.end_of_transfer = (index_eot & HEADER_FRAME_INDEX_EOT_MASK) != 0U;
ptr += 2; // Opaque user data.
ptr += HEADER_CRC_SIZE_BYTES;
out->payload.data = ptr;
out->payload.size = datagram_payload.size - HEADER_SIZE_BYTES;
ok = true;
out->base.payload.data = ptr;
out->base.payload.size = datagram_payload.size - HEADER_SIZE_BYTES;
ok = true;
UDPARD_ASSERT((ptr == (((const byte_t*) datagram_payload.data) + HEADER_SIZE_BYTES)) &&
(out->payload.size > 0U));
(out->base.payload.size > 0U));
}
}
// Parsers for other header versions may be added here later.
}
if (ok) // Version-agnostic semantics check.
{
UDPARD_ASSERT(out->payload.size > 0); // Follows from the prior checks.
UDPARD_ASSERT(out->base.payload.size > 0); // Follows from the prior checks.
const bool anonymous = out->meta.src_node_id == UDPARD_NODE_ID_UNSET;
const bool broadcast = out->meta.dst_node_id == UDPARD_NODE_ID_UNSET;
const bool service = (out->meta.data_specifier & DATA_SPECIFIER_SERVICE_NOT_MESSAGE_MASK) != 0;
const bool single_frame = (out->index == 0) && out->end_of_transfer;
const bool single_frame = (out->base.index == 0) && out->base.end_of_transfer;
ok = service ? ((!broadcast) && (!anonymous)) : (broadcast && ((!anonymous) || single_frame));
}
return ok;
Expand Down Expand Up @@ -1053,16 +1060,18 @@ static inline bool rxSlotEject(size_t* const out_payload_si
}

/// This function will either move the frame payload into the session, or free it if it cannot be made use of.
/// Returns: 1 -- transfer available; 0 -- transfer not yet available; <0 -- error.
static inline int_fast8_t rxSlotUpdate(RxSlot* const self,
const RxFrame frame,
struct UdpardRxTransfer* const rx_transfer,
/// Upon return, certain state variables may be overwritten, so the caller should not rely on them.
/// Returns: 1 -- transfer available, payload written; 0 -- transfer not yet available; <0 -- error.
static inline int_fast8_t rxSlotAccept(RxSlot* const self,
size_t* const out_transfer_payload_size,
struct UdpardFragment* const out_transfer_payload_head,
const RxFrameBase frame,
const size_t extent,
struct UdpardMemoryResource* const memory_fragment,
struct UdpardMemoryResource* const memory_payload)
{
UDPARD_ASSERT((self != NULL) && (self->transfer_id == frame.meta.transfer_id) && (frame.payload.size > 0) &&
(rx_transfer != NULL) && memIsValid(memory_fragment));
UDPARD_ASSERT((self != NULL) && (frame.payload.size > 0) && (out_transfer_payload_size != NULL) &&
(out_transfer_payload_head != NULL) && memIsValid(memory_fragment));
int_fast8_t result = 0;
bool restart = false;
bool release = true;
Expand All @@ -1080,7 +1089,7 @@ static inline int_fast8_t rxSlotUpdate(RxSlot* const self,
UDPARD_ASSERT(frame.index <= self->max_index);
if (self->max_index > self->eot_index)
{
restart = true; // Frames past EOT, discard the entire transfer because we don't trust it.
restart = true; // Frames past EOT found, discard the entire transfer because we don't trust it anymore.
goto finish;
}
// SECOND: Insert the fragment into the fragment tree. If it already exists, drop and free the duplicate.
Expand Down Expand Up @@ -1117,15 +1126,8 @@ static inline int_fast8_t rxSlotUpdate(RxSlot* const self,
restart = true;
if (self->payload_size >= TRANSFER_CRC_SIZE_BYTES)
{
// NOLINTNEXTLINE(clang-analyzer-security.insecureAPI.DeprecatedOrUnsafeBufferHandling)
(void) memset(rx_transfer, 0, sizeof(struct UdpardRxTransfer)); // Safety.
rx_transfer->timestamp_usec = self->ts_usec;
rx_transfer->priority = frame.meta.priority;
rx_transfer->source_node_id = frame.meta.src_node_id;
rx_transfer->transfer_id = frame.meta.transfer_id;
//
result = rxSlotEject(&rx_transfer->payload_size,
&rx_transfer->payload,
result = rxSlotEject(out_transfer_payload_size,
out_transfer_payload_head,
self->fragments,
self->payload_size,
extent,
Expand All @@ -1140,7 +1142,7 @@ static inline int_fast8_t rxSlotUpdate(RxSlot* const self,
finish:
if (restart)
{
rxSlotRestart(self, frame.meta.transfer_id + 1, memory_fragment, memory_payload);
rxSlotRestart(self, self->transfer_id + 1U, memory_fragment, memory_payload);
}
if (release)
{
Expand All @@ -1158,7 +1160,7 @@ static inline int_fast8_t rxIfaceUpdate(RxIface* const sel
const UdpardMicrosecond transfer_id_timeout_usec,
const struct UdpardRxMemoryResources memory)
{
UDPARD_ASSERT((self != NULL) && (frame.payload.size > 0) && (received_transfer != NULL));
UDPARD_ASSERT((self != NULL) && (frame.base.payload.size > 0) && (received_transfer != NULL));
RxSlot* slot = NULL;
// First we should check if there is an existing slot for this transfer; if yes, this is the simplest case.
for (uint_fast8_t i = 0; i < RX_SLOT_COUNT; i++)
Expand Down Expand Up @@ -1202,11 +1204,26 @@ static inline int_fast8_t rxIfaceUpdate(RxIface* const sel
{
slot->ts_usec = ts_usec; // Transfer timestamp is the timestamp of the earliest frame.
}
result = rxSlotUpdate(slot, frame, received_transfer, extent, memory.fragment, memory.payload);
const UdpardMicrosecond ts = slot->ts_usec;
UDPARD_ASSERT(slot->transfer_id == frame.meta.transfer_id);
result = rxSlotAccept(slot, // May invalidate state variables such as timestamp or transfer-ID.
&received_transfer->payload_size,
&received_transfer->payload,
frame.base,
extent,
memory.fragment,
memory.payload);
if (result > 0) // Transfer successfully received, populate the transfer descriptor for the client.
{
received_transfer->timestamp_usec = ts;
received_transfer->priority = frame.meta.priority;
received_transfer->source_node_id = frame.meta.src_node_id;
received_transfer->transfer_id = frame.meta.transfer_id;
}
}
else
{
memFreePayload(memory.payload, frame.origin);
memFreePayload(memory.payload, frame.base.origin);
}
return result;
}
Expand Down
1 change: 1 addition & 0 deletions tests/.idea/dictionaries/pavel.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 37 additions & 39 deletions tests/src/test_intrusive_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ static void testParseFrameValidMessage(void)
TEST_ASSERT_EQUAL_UINT64(UDPARD_NODE_ID_UNSET, rxf.meta.dst_node_id);
TEST_ASSERT_EQUAL_UINT64(7654, rxf.meta.data_specifier);
TEST_ASSERT_EQUAL_UINT64(0xbadc0ffee0ddf00d, rxf.meta.transfer_id);
TEST_ASSERT_EQUAL_UINT64(12345, rxf.index);
TEST_ASSERT_FALSE(rxf.end_of_transfer);
TEST_ASSERT_EQUAL_UINT64(3, rxf.payload.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY("abc", rxf.payload.data, 3);
TEST_ASSERT_EQUAL_UINT64(sizeof(data), rxf.origin.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY(data, rxf.origin.data, sizeof(data));
TEST_ASSERT_EQUAL_UINT64(12345, rxf.base.index);
TEST_ASSERT_FALSE(rxf.base.end_of_transfer);
TEST_ASSERT_EQUAL_UINT64(3, rxf.base.payload.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY("abc", rxf.base.payload.data, 3);
TEST_ASSERT_EQUAL_UINT64(sizeof(data), rxf.base.origin.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY(data, rxf.base.origin.data, sizeof(data));
}

static void testParseFrameValidRPCService(void)
Expand All @@ -55,12 +55,12 @@ static void testParseFrameValidRPCService(void)
DATA_SPECIFIER_SERVICE_REQUEST_NOT_RESPONSE_MASK,
rxf.meta.data_specifier);
TEST_ASSERT_EQUAL_UINT64(0xbadc0ffee0ddf00d, rxf.meta.transfer_id);
TEST_ASSERT_EQUAL_UINT64(6654, rxf.index);
TEST_ASSERT_FALSE(rxf.end_of_transfer);
TEST_ASSERT_EQUAL_UINT64(3, rxf.payload.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY("abc", rxf.payload.data, 3);
TEST_ASSERT_EQUAL_UINT64(sizeof(data), rxf.origin.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY(data, rxf.origin.data, sizeof(data));
TEST_ASSERT_EQUAL_UINT64(6654, rxf.base.index);
TEST_ASSERT_FALSE(rxf.base.end_of_transfer);
TEST_ASSERT_EQUAL_UINT64(3, rxf.base.payload.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY("abc", rxf.base.payload.data, 3);
TEST_ASSERT_EQUAL_UINT64(sizeof(data), rxf.base.origin.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY(data, rxf.base.origin.data, sizeof(data));
}

static void testParseFrameValidMessageAnonymous(void)
Expand All @@ -75,12 +75,12 @@ static void testParseFrameValidMessageAnonymous(void)
TEST_ASSERT_EQUAL_UINT64(UDPARD_NODE_ID_UNSET, rxf.meta.dst_node_id);
TEST_ASSERT_EQUAL_UINT64(7654, rxf.meta.data_specifier);
TEST_ASSERT_EQUAL_UINT64(0xbadc0ffee0ddf00d, rxf.meta.transfer_id);
TEST_ASSERT_EQUAL_UINT64(0, rxf.index);
TEST_ASSERT_TRUE(rxf.end_of_transfer);
TEST_ASSERT_EQUAL_UINT64(3, rxf.payload.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY("abc", rxf.payload.data, 3);
TEST_ASSERT_EQUAL_UINT64(sizeof(data), rxf.origin.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY(data, rxf.origin.data, sizeof(data));
TEST_ASSERT_EQUAL_UINT64(0, rxf.base.index);
TEST_ASSERT_TRUE(rxf.base.end_of_transfer);
TEST_ASSERT_EQUAL_UINT64(3, rxf.base.payload.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY("abc", rxf.base.payload.data, 3);
TEST_ASSERT_EQUAL_UINT64(sizeof(data), rxf.base.origin.size);
TEST_ASSERT_EQUAL_UINT8_ARRAY(data, rxf.base.origin.data, sizeof(data));
}

static void testParseFrameRPCServiceAnonymous(void)
Expand Down Expand Up @@ -295,12 +295,8 @@ static void testSlotEjectValidLarge(void)
instrumentedAllocatorNew(&mem_fragment);
instrumentedAllocatorNew(&mem_payload);
//>>> from pycyphal.transport.commons.crc import CRC32C
//>>> data = ...
//>>> CRC32C.new(data).value_as_bytes
static const char DaShi[] = "Da Shi, have you ever... considered certain ultimate philosophical questions? "
"For example, where does Man come from? "
"Where does Man go? "
"Where does the universe come from? ";
//>>> CRC32C.new(data_bytes).value_as_bytes
static const size_t PayloadSize = 171;
// Build the fragment tree:
// 2
// / `
Expand Down Expand Up @@ -336,7 +332,7 @@ static void testSlotEjectValidLarge(void)
->tree.base;
// Initialization done, ensure the memory utilization is as we expect.
TEST_ASSERT_EQUAL(4, mem_payload.allocated_fragments);
TEST_ASSERT_EQUAL(sizeof(DaShi) - 1 + TRANSFER_CRC_SIZE_BYTES, mem_payload.allocated_bytes);
TEST_ASSERT_EQUAL(PayloadSize + TRANSFER_CRC_SIZE_BYTES, mem_payload.allocated_bytes);
TEST_ASSERT_EQUAL(4, mem_fragment.allocated_fragments);
TEST_ASSERT_EQUAL(sizeof(RxFragment) * 4, mem_fragment.allocated_bytes);
// Eject and verify the payload.
Expand All @@ -349,8 +345,8 @@ static void testSlotEjectValidLarge(void)
1024,
&mem_fragment.base,
&mem_payload.base));
TEST_ASSERT_EQUAL(sizeof(DaShi) - 1, payload_size); // CRC removed!
TEST_ASSERT( //
TEST_ASSERT_EQUAL(PayloadSize, payload_size); // CRC removed!
TEST_ASSERT( //
compareStringWithPayload("Da Shi, have you ever... considered certain ultimate philosophical questions? ",
payload.view));
TEST_ASSERT(compareStringWithPayload("For example, where does Man come from? ", payload.next->view));
Expand All @@ -360,7 +356,7 @@ static void testSlotEjectValidLarge(void)
// Check the memory utilization. All payload fragments are still kept, but the first fragment is freed because of
// the Scott's short payload optimization.
TEST_ASSERT_EQUAL(4, mem_payload.allocated_fragments);
TEST_ASSERT_EQUAL(sizeof(DaShi) - 1 + TRANSFER_CRC_SIZE_BYTES, mem_payload.allocated_bytes);
TEST_ASSERT_EQUAL(PayloadSize + TRANSFER_CRC_SIZE_BYTES, mem_payload.allocated_bytes);
TEST_ASSERT_EQUAL(3, mem_fragment.allocated_fragments); // One gone!!1
TEST_ASSERT_EQUAL(sizeof(RxFragment) * 3, mem_fragment.allocated_bytes); // yes yes!
// Now, free the payload as the application would.
Expand All @@ -379,13 +375,8 @@ static void testSlotEjectValidSmall(void)
instrumentedAllocatorNew(&mem_fragment);
instrumentedAllocatorNew(&mem_payload);
//>>> from pycyphal.transport.commons.crc import CRC32C
//>>> data = ...
//>>> CRC32C.new(data).value_as_bytes
static const char BuildSea[] = "Did you build this four-dimensional fragment?\n"
"You told me that you came from the sea. Did you build the sea?\n"
"Are you saying that for you, or at least for your creators, "
"this four-dimensional space is like the sea for us?\n"
"More like a puddle. The sea has gone dry.";
//>>> CRC32C.new(data_bytes).value_as_bytes
static const size_t PayloadSize = 262;
// Build the fragment tree:
// 1
// / `
Expand Down Expand Up @@ -428,7 +419,7 @@ static void testSlotEjectValidSmall(void)
->tree.base;
// Initialization done, ensure the memory utilization is as we expect.
TEST_ASSERT_EQUAL(5, mem_payload.allocated_fragments);
TEST_ASSERT_EQUAL(sizeof(BuildSea) - 1 + TRANSFER_CRC_SIZE_BYTES, mem_payload.allocated_bytes);
TEST_ASSERT_EQUAL(PayloadSize + TRANSFER_CRC_SIZE_BYTES, mem_payload.allocated_bytes);
TEST_ASSERT_EQUAL(5, mem_fragment.allocated_fragments);
TEST_ASSERT_EQUAL(sizeof(RxFragment) * 5, mem_fragment.allocated_bytes);
// Eject and verify the payload. Use a small extent and ensure the excess is dropped.
Expand Down Expand Up @@ -547,9 +538,16 @@ static void testSlotEjectInvalid(void)
TEST_ASSERT_EQUAL(0, mem_fragment.allocated_bytes);
}

static void testSlotUpdateA(void)
static void testSlotAcceptA(void)
{
//
InstrumentedAllocator mem_fragment = {0};
InstrumentedAllocator mem_payload = {0};
instrumentedAllocatorNew(&mem_fragment);
instrumentedAllocatorNew(&mem_payload);
size_t payload_size = 0;
struct UdpardFragment payload = {0};
(void) payload_size;
(void) payload;
}

void setUp(void) {}
Expand All @@ -575,7 +573,7 @@ int main(void)
RUN_TEST(testSlotEjectValidSmall);
RUN_TEST(testSlotEjectValidEmpty);
RUN_TEST(testSlotEjectInvalid);
RUN_TEST(testSlotUpdateA);
RUN_TEST(testSlotAcceptA);
return UNITY_END();
}

Expand Down

0 comments on commit da7c3c1

Please sign in to comment.