Skip to content

Commit

Permalink
WIP: Start system event loop at the beginning of the program and do s…
Browse files Browse the repository at this point in the history
…tartup coordination through it
  • Loading branch information
erlingrj committed Mar 4, 2025
1 parent 6f291de commit bbeb9ec
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 210 deletions.
4 changes: 3 additions & 1 deletion include/reactor-uc/clock_synchronization.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ struct ClockSynchronization {
NeighborClock *neighbor_clock; // Pointer to an array of neighbor clocks, one for each neighbor.
size_t num_neighbours; // Number of neighbors, length of the neighbor_clock array.
bool is_grandmaster; // Whether this node is the grandmaster.
bool has_initial_sync; // Whether the initial sync has been completed.
int master_neighbor_index; // The index of the master neighbor, if this node is not the grandmaster.
int sequence_number; // The sequence number of the last sent sync request message (if slave).
interval_t period; // The period between sync request messages are sent to the neighbor master.
ClockSyncTimestamps timestamps; // The timestamps used to compute clock offset.
ClockServo servo; // The PID controller
FederateMessage msg; // A FederateMessage used for transmitting sync request and follow-up messages.
// TODO: Is this message here needed? Why cant we use the one on the bundle?
FederateMessage msg; // A FederateMessage used for transmitting sync request and follow-up messages.
void (*handle_message_callback)(ClockSynchronization *self, const ClockSyncMessage *msg, size_t bundle_idx);
};

Expand Down
3 changes: 1 addition & 2 deletions include/reactor-uc/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
#define LF_COLORIZE_LOGS 1
#endif

#define LF_LOG_LEVEL_CLOCK_SYNC LF_LOG_LEVEL_DEBUG
#define LF_LOG_LEVEL_SCHED LF_LOG_LEVEL_DEBUG
#define LF_LOG_LEVEL_FED LF_LOG_LEVEL_DEBUG

/** Add timestamps to each log entry. */
#if !defined(LF_TIMESTAMP_LOGS) && !defined(PLATFORM_FLEXPRET)
Expand Down
9 changes: 6 additions & 3 deletions include/reactor-uc/macros_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -513,15 +513,18 @@ typedef struct FederatedInputConnection FederatedInputConnection;

#define LF_FEDERATED_INPUT_CONNECTION_INSTANCE(ReactorName, InputName) ReactorName##_##InputName##_conn InputName

#define LF_DEFINE_STARTUP_COORDINATOR_STRUCT(ReactorName, NumNeighbors) \
#define LF_DEFINE_STARTUP_COORDINATOR_STRUCT(ReactorName, NumNeighbors, NumEvents) \
typedef struct { \
StartupCoordinator super; \
StartupEvent events[(NumEvents)]; \
bool used[(NumEvents)]; \
NeighborState neighbors[NumNeighbors]; \
} ReactorName##StartupCoordinator;

#define LF_DEFINE_STARTUP_COORDINATOR_CTOR(ReactorName, NumNeighbors, LongestPath) \
#define LF_DEFINE_STARTUP_COORDINATOR_CTOR(ReactorName, NumNeighbors, LongestPath, NumEvents) \
void ReactorName##StartupCoordinator_ctor(ReactorName##StartupCoordinator *self, Environment *env) { \
StartupCoordinator_ctor(&self->super, env, self->neighbors, NumNeighbors, LongestPath); \
StartupCoordinator_ctor(&self->super, env, self->neighbors, NumNeighbors, LongestPath, sizeof(StartupEvent), \
(void *)self->events, self->used, (NumEvents)); \
}

#define LF_DEFINE_STARTUP_COORDINATOR(ReactorName) ReactorName##StartupCoordinator startup_coordinator;
Expand Down
2 changes: 2 additions & 0 deletions include/reactor-uc/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ struct Scheduler {
*/
void (*run)(Scheduler *self);

void (*step_clock)(Scheduler *self, interval_t step);

/**
* @brief Called to execute all reactions triggered by a shutdown trigger.
*/
Expand Down
14 changes: 10 additions & 4 deletions include/reactor-uc/startup_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@ typedef struct {
size_t start_time_proposals_received; // The number of start time proposals received from this neighbor.
} NeighborState;

/** The payload of a StartupCoordinator event. */
typedef struct {
int neighbor_index;
StartupCoordination msg;
} StartupEvent;

/** This structure is used to coordinate the startup of the federation. */
struct StartupCoordinator {
SystemEventHandler super;
Environment *env;
size_t longest_path;
StartupCoordinationState state;
Expand All @@ -28,12 +35,11 @@ struct StartupCoordinator {
FederateMessage msg;
instant_t start_time_proposal;
void (*handle_message_callback)(StartupCoordinator *self, const StartupCoordination *msg, size_t bundle_idx);
lf_ret_t (*connect_to_neigbors)(StartupCoordinator *self);
lf_ret_t (*perform_handshake)(StartupCoordinator *self);
instant_t (*negotiate_start_time)(StartupCoordinator *self);
lf_ret_t (*connect_to_neighbors_blocking)(StartupCoordinator *self);
};

void StartupCoordinator_ctor(StartupCoordinator *self, Environment *env, NeighborState *neighbor_state,
size_t num_neighbors, size_t longest_path);
size_t num_neighbors, size_t longest_path, size_t payload_size, void *payload_buf,
bool *payload_used_buf, size_t payload_buf_capacity);

#endif // REACTOR_UC_STARTUP_COORDINATOR_H
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ class UcMainGeneratorFederated(

override fun getNumSystemEvents(): Int {
val clockSyncSystemEvents = UcClockSyncGenerator.getNumSystemEvents(netBundlesSize)
return clockSyncSystemEvents
val startupCoordinatorEvents = UcStartupCoordinatorGenerator.getNumSystemEvents(netBundlesSize)
return clockSyncSystemEvents + startupCoordinatorEvents
}

override fun generateStartSource() =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,32 @@ class UcStartupCoordinatorGenerator(
) {

companion object {
// The number of system events allocated for each neigbor. Used to schedule received messages as
// system events.
val numSystemEventsPerBundle = 2

// The number of additional system events allocated. This system event is used for the periodic
// SyncRequest event.
val numSystemEventsConst = 2

// Returns the number of system events needed by the clock sync subsystem, given a number of
// neighbors.
fun getNumSystemEvents(numBundles: Int) =
numSystemEventsPerBundle * numBundles + numSystemEventsConst

val instName = "startup_coordinator"
}

private val numNeighbors = connectionGenerator.getNumFederatedConnectionBundles()
private val numSystemEvents = getNumSystemEvents(numNeighbors)
private val longestPath = connectionGenerator.getLongestFederatePath()
private val typeName = "Federate"

fun generateSelfStruct() = "LF_DEFINE_STARTUP_COORDINATOR_STRUCT(${typeName}, ${numNeighbors})"
fun generateSelfStruct() =
"LF_DEFINE_STARTUP_COORDINATOR_STRUCT(${typeName}, ${numNeighbors}, ${numSystemEvents})"

fun generateCtor() =
"LF_DEFINE_STARTUP_COORDINATOR_CTOR(Federate, ${numNeighbors}, ${longestPath});"
"LF_DEFINE_STARTUP_COORDINATOR_CTOR(Federate, ${numNeighbors}, ${longestPath}, ${numSystemEvents});"

fun generateFederateStructField() = "${typeName}StartupCoordinator ${instName};"

Expand Down
11 changes: 11 additions & 0 deletions src/clock_synchronization.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ static void ClockSynchronization_correct_clock(ClockSynchronization *self, Clock
interval_t clock_offset = owd - (timestamps->t2 - timestamps->t1);
LF_DEBUG(CLOCK_SYNC, "RTT: " PRINTF_TIME " OWD: " PRINTF_TIME " offset: " PRINTF_TIME, rtt, owd, clock_offset);

// The very first iteration of clock sync we step the clock to the first offset we receive.
if (!self->has_initial_sync) {
self->has_initial_sync = true;
self->env->clock.set_time(&self->env->clock, self->env->clock.get_time(&self->env->clock) + clock_offset);
// Also inform the scheduler that we have stepped the clock so it can adjust timestamps
// of pending events.
self->env->scheduler->step_clock(self->env->scheduler, clock_offset);
self->env->platform->new_async_event(self->env->platform);
return;
}

self->servo.last_error = clock_offset;
self->servo.accumulated_error += clock_offset;
float correction_float = self->servo.Kp * clock_offset + self->servo.Ki * self->servo.accumulated_error;
Expand Down
15 changes: 4 additions & 11 deletions src/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,19 @@ void Environment_assemble(Environment *self) {
lf_ret_t ret;
Environment_validate(self);

// Establish connections to all neighbors:
if (self->is_federated) {
ret = self->startup_coordinator->connect_to_neigbors(self->startup_coordinator);
validate(ret == LF_OK);
ret = self->startup_coordinator->perform_handshake(self->startup_coordinator);
ret = self->startup_coordinator->connect_to_neighbors_blocking(self->startup_coordinator);
validate(ret == LF_OK);
}
}

void Environment_start(Environment *self) {
instant_t start_time;
if (self->is_federated) {
start_time = self->startup_coordinator->negotiate_start_time(self->startup_coordinator);
// If we are a clock sync slave, we start by setting the current time to the start time.
if (self->do_clock_sync && !self->clock_sync->is_grandmaster) {
self->clock.set_time(&self->clock, start_time);
}
} else {
if (!self->is_federated) {
start_time = self->get_physical_time(self);
self->scheduler->set_and_schedule_start_tag(self->scheduler, start_time);
}
self->scheduler->set_and_schedule_start_tag(self->scheduler, start_time);
self->scheduler->run(self->scheduler);
}

Expand Down
18 changes: 17 additions & 1 deletion src/schedulers/dynamic/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ void Scheduler_run(Scheduler *untyped_self) {
LF_DEBUG(SCHED, "Scheduler running with non_terminating=%d has_async_events=%d", non_terminating,
env->has_async_events);

while (non_terminating || !self->event_queue->empty(self->event_queue)) {
while (non_terminating || !self->event_queue->empty(self->event_queue) || untyped_self->start_time == NEVER) {
next_tag = self->event_queue->next_tag(self->event_queue);
if (self->system_event_queue) {
next_system_tag = self->system_event_queue->next_tag(self->system_event_queue);
Expand Down Expand Up @@ -469,6 +469,21 @@ void Scheduler_request_shutdown(Scheduler *untyped_self) {
env->leave_critical_section(env);
}

static void Scheduler_step_clock(Scheduler *_self, interval_t step) {
DynamicScheduler *self = (DynamicScheduler *)_self;

EventQueue *q = self->system_event_queue;
for (size_t i = 0; i < q->size; i++) {
ArbitraryEvent event = q->array[i];
instant_t old_tag = event.system_event.super.tag.time;
instant_t new_tag = old_tag + step;
if (new_tag < 0) {
new_tag = 0;
}
event.system_event.super.tag.time = new_tag;
}
}

lf_ret_t Scheduler_add_to_reaction_queue(Scheduler *untyped_self, Reaction *reaction) {
DynamicScheduler *self = (DynamicScheduler *)untyped_self;

Expand Down Expand Up @@ -505,6 +520,7 @@ void DynamicScheduler_ctor(DynamicScheduler *self, Environment *env, EventQueue
self->super.set_and_schedule_start_tag = Scheduler_set_and_schedule_start_tag;
self->super.add_to_reaction_queue = Scheduler_add_to_reaction_queue;
self->super.current_tag = Scheduler_current_tag;
self->super.step_clock = Scheduler_step_clock;
}

Scheduler *Scheduler_new(Environment *env, EventQueue *event_queue, EventQueue *system_event_queue,
Expand Down
Loading

0 comments on commit bbeb9ec

Please sign in to comment.