diff --git a/core/environment.c b/core/environment.c index 4523c4721..bf8186c69 100644 --- a/core/environment.c +++ b/core/environment.c @@ -211,15 +211,18 @@ void environment_free(environment_t* env) { } void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) { - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + // Current tag and start tag of the environment is initialized. + env->current_tag = (tag_t){.time = start_time, .microstep = 0}; + env->start_tag = (tag_t){.time = start_time, .microstep = 0}; + env->duration = duration; - tag_t stop_tag = FOREVER_TAG_INITIALIZER; if (duration >= 0LL) { // A duration has been specified. Calculate the stop time. - stop_tag.time = env->current_tag.time + duration; - stop_tag.microstep = 0; + env->stop_tag.time = env->start_tag.time + env->duration; + env->stop_tag.microstep = 0; + } else { + env->stop_tag = (tag_t)FOREVER_TAG_INITIALIZER; } - env->stop_tag = stop_tag; } int environment_init(environment_t* env, const char* name, int id, int num_workers, int num_timers, diff --git a/core/reactor.c b/core/reactor.c index 00df9e07f..d050d8498 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -337,11 +337,25 @@ int lf_reactor_c_main(int argc, const char* argv[]) { #ifndef FEDERATED lf_tracing_set_start_time(start_time); #endif + // Create and initialize the environment + _lf_create_environments(); // code-generated function + environment_t* env; + int num_environments = _lf_get_environments(&env); + LF_ASSERT(num_environments == 1, "Found %d environments. Only 1 can be used with the single-threaded runtime", + num_environments); + + LF_PRINT_DEBUG("Initializing."); + initialize_global(); + // Set start time + start_time = lf_time_physical(); LF_PRINT_DEBUG("NOTE: FOREVER is displayed as " PRINTF_TAG " and NEVER as " PRINTF_TAG, FOREVER_TAG.time - start_time, FOREVER_TAG.microstep, NEVER_TAG.time - start_time, 0); environment_init_tags(env, start_time, duration); + env->current_tag = env->start_tag; + // Start tracing if enalbed + start_trace(env->trace); #ifdef MODAL_REACTORS // Set up modal infrastructure _lf_initialize_modes(env); diff --git a/core/reactor_common.c b/core/reactor_common.c index 33e5582f5..c7965cd05 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -374,260 +374,256 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) { // Schedule at t + period. delay = timer->period; } - } else { - // Schedule at t + offset. - delay = timer->offset; - } - // Get an event_t struct to put on the event queue. - // Recycle event_t structs, if possible. - event_t* e = lf_get_new_event(env); - e->trigger = timer; - e->base.tag = (tag_t){.time = lf_time_logical(env) + delay, .microstep = 0}; - // NOTE: No lock is being held. Assuming this only happens at startup. - pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); - tracepoint_schedule(env, timer, delay); // Trace even though schedule is not called. -} + // Get an event_t struct to put on the event queue. + // Recycle event_t structs, if possible. + event_t* e = lf_get_new_event(env); + e->trigger = timer; + e->base.tag = (tag_t){.time = lf_time_logical(env) + delay, .microstep = 0}; + // NOTE: No lock is being held. Assuming this only happens at startup. + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); + tracepoint_schedule(env, timer, delay); // Trace even though schedule is not called. + } -void _lf_initialize_timers(environment_t* env) { - assert(env != GLOBAL_ENVIRONMENT); - for (int i = 0; i < env->timer_triggers_size; i++) { - if (env->timer_triggers[i] != NULL) { - _lf_initialize_timer(env, env->timer_triggers[i]); + void _lf_initialize_timers(environment_t * env) { + assert(env != GLOBAL_ENVIRONMENT); + for (int i = 0; i < env->timer_triggers_size; i++) { + if (env->timer_triggers[i] != NULL) { + _lf_initialize_timer(env, env->timer_triggers[i]); + } } - } - // To avoid runtime memory allocations for timer-driven programs - // the recycle queue is initialized with a single event. - if (env->timer_triggers_size > 0) { - event_t* e = lf_get_new_event(env); - lf_recycle_event(env, e); + // To avoid runtime memory allocations for timer-driven programs + // the recycle queue is initialized with a single event. + if (env->timer_triggers_size > 0) { + event_t* e = lf_get_new_event(env); + lf_recycle_event(env, e); + } } -} -void _lf_trigger_startup_reactions(environment_t* env) { - assert(env != GLOBAL_ENVIRONMENT); - for (int i = 0; i < env->startup_reactions_size; i++) { - if (env->startup_reactions[i] != NULL) { - if (env->startup_reactions[i]->mode != NULL) { - // Skip reactions in modes - continue; + void _lf_trigger_startup_reactions(environment_t * env) { + assert(env != GLOBAL_ENVIRONMENT); + for (int i = 0; i < env->startup_reactions_size; i++) { + if (env->startup_reactions[i] != NULL) { + if (env->startup_reactions[i]->mode != NULL) { + // Skip reactions in modes + continue; + } + _lf_trigger_reaction(env, env->startup_reactions[i], -1); } - _lf_trigger_reaction(env, env->startup_reactions[i], -1); } - } #ifdef MODAL_REACTORS - if (env->modes) { - _lf_handle_mode_startup_reset_reactions(env, env->startup_reactions, env->startup_reactions_size, NULL, 0, - env->modes->modal_reactor_states, env->modes->modal_reactor_states_size); - } + if (env->modes) { + _lf_handle_mode_startup_reset_reactions(env, env->startup_reactions, env->startup_reactions_size, NULL, 0, + env->modes->modal_reactor_states, env->modes->modal_reactor_states_size); + } #endif -} + } -void _lf_trigger_shutdown_reactions(environment_t* env) { - assert(env != GLOBAL_ENVIRONMENT); - for (int i = 0; i < env->shutdown_reactions_size; i++) { - if (env->shutdown_reactions[i] != NULL) { - if (env->shutdown_reactions[i]->mode != NULL) { - // Skip reactions in modes - continue; + void _lf_trigger_shutdown_reactions(environment_t * env) { + assert(env != GLOBAL_ENVIRONMENT); + for (int i = 0; i < env->shutdown_reactions_size; i++) { + if (env->shutdown_reactions[i] != NULL) { + if (env->shutdown_reactions[i]->mode != NULL) { + // Skip reactions in modes + continue; + } + _lf_trigger_reaction(env, env->shutdown_reactions[i], -1); } - _lf_trigger_reaction(env, env->shutdown_reactions[i], -1); } - } #ifdef MODAL_REACTORS - if (env->modes) { - _lf_handle_mode_shutdown_reactions(env, env->shutdown_reactions, env->shutdown_reactions_size); - } + if (env->modes) { + _lf_handle_mode_shutdown_reactions(env, env->shutdown_reactions, env->shutdown_reactions_size); + } #endif -} + } -void lf_recycle_event(environment_t* env, event_t* e) { - assert(env != GLOBAL_ENVIRONMENT); - e->base.tag = (tag_t){.time = 0LL, .microstep = 0}; - e->trigger = NULL; - e->token = NULL; + void lf_recycle_event(environment_t * env, event_t * e) { + assert(env != GLOBAL_ENVIRONMENT); + e->base.tag = (tag_t){.time = 0LL, .microstep = 0}; + e->trigger = NULL; + e->token = NULL; #ifdef FEDERATED_DECENTRALIZED - e->intended_tag = (tag_t){.time = NEVER, .microstep = 0u}; + e->intended_tag = (tag_t){.time = NEVER, .microstep = 0u}; #endif - pqueue_tag_insert(env->recycle_q, (pqueue_tag_element_t*)e); -} + pqueue_tag_insert(env->recycle_q, (pqueue_tag_element_t*)e); + } -event_t* _lf_create_dummy_events(environment_t* env, tag_t tag) { - event_t* dummy = lf_get_new_event(env); - dummy->base.tag = tag; + event_t* _lf_create_dummy_events(environment_t * env, tag_t tag) { + event_t* dummy = lf_get_new_event(env); + dummy->base.tag = tag; - dummy->trigger = NULL; - return dummy; -} + dummy->trigger = NULL; + return dummy; + } -void lf_replace_token(event_t* event, lf_token_t* token) { - if (event->token != token) { - // Free the existing token, if any. - _lf_done_using(event->token); + void lf_replace_token(event_t * event, lf_token_t * token) { + if (event->token != token) { + // Free the existing token, if any. + _lf_done_using(event->token); + } + // Replace the token with ours. + event->token = token; } - // Replace the token with ours. - event->token = token; -} -trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag_t tag, lf_token_t* token) { - assert(env != GLOBAL_ENVIRONMENT); - tag_t current_logical_tag = env->current_tag; + trigger_handle_t _lf_schedule_at_tag(environment_t * env, trigger_t * trigger, tag_t tag, lf_token_t * token) { + assert(env != GLOBAL_ENVIRONMENT); + tag_t current_logical_tag = env->current_tag; - LF_PRINT_DEBUG("_lf_schedule_at_tag() called with tag " PRINTF_TAG " at tag " PRINTF_TAG ".", tag.time - start_time, - tag.microstep, current_logical_tag.time - start_time, current_logical_tag.microstep); - if (lf_tag_compare(tag, current_logical_tag) <= 0 && env->execution_started) { - lf_print_warning("_lf_schedule_at_tag(): requested to schedule an event at the current or past tag."); - return -1; - } + LF_PRINT_DEBUG("_lf_schedule_at_tag() called with tag " PRINTF_TAG " at tag " PRINTF_TAG ".", tag.time - start_time, + tag.microstep, current_logical_tag.time - start_time, current_logical_tag.microstep); + if (lf_tag_compare(tag, current_logical_tag) <= 0 && env->execution_started) { + lf_print_warning("_lf_schedule_at_tag(): requested to schedule an event at the current or past tag."); + return -1; + } - // Increment the reference count of the token. - if (token != NULL) { - token->ref_count++; - LF_PRINT_DEBUG("_lf_schedule_at_tag: Incremented ref_count of %p to %zu.", (void*)token, token->ref_count); - } + // Increment the reference count of the token. + if (token != NULL) { + token->ref_count++; + LF_PRINT_DEBUG("_lf_schedule_at_tag: Incremented ref_count of %p to %zu.", (void*)token, token->ref_count); + } - // Do not schedule events if the tag is after the stop tag - if (lf_is_tag_after_stop_tag(env, tag)) { - lf_print_warning("_lf_schedule_at_tag: event time is past the timeout. Discarding event."); - _lf_done_using(token); - return -1; - } + // Do not schedule events if the tag is after the stop tag + if (lf_is_tag_after_stop_tag(env, tag)) { + lf_print_warning("_lf_schedule_at_tag: event time is past the timeout. Discarding event."); + _lf_done_using(token); + return -1; + } - event_t* e = lf_get_new_event(env); - // Set the event time - e->base.tag = tag; + event_t* e = lf_get_new_event(env); + // Set the event time + e->base.tag = tag; - tracepoint_schedule(env, trigger, tag.time - current_logical_tag.time); + tracepoint_schedule(env, trigger, tag.time - current_logical_tag.time); - // Make sure the event points to this trigger so when it is - // dequeued, it will trigger this trigger. - e->trigger = trigger; + // Make sure the event points to this trigger so when it is + // dequeued, it will trigger this trigger. + e->trigger = trigger; - // Set the payload. - e->token = token; + // Set the payload. + e->token = token; #ifdef FEDERATED_DECENTRALIZED - // Set the intended tag - e->intended_tag = trigger->intended_tag; + // Set the intended tag + e->intended_tag = trigger->intended_tag; #endif - event_t* found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)e); - if (found != NULL) { - switch (trigger->policy) { - case drop: - if (found->token != token) { - _lf_done_using(token); - } - lf_recycle_event(env, e); - return (0); - break; - case replace: - // Replace the payload of the event at the head with our - // current payload. - lf_replace_token(found, token); - lf_recycle_event(env, e); - return 0; - break; - default: - // Adding a microstep to the original - // intended tag. - tag.microstep++; - e->base.tag = tag; - if (lf_is_tag_after_stop_tag(env, (tag_t){.time = tag.time, .microstep = tag.microstep})) { - // Scheduling e will incur a microstep after the stop tag, - // which is illegal. + event_t* found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)e); + if (found != NULL) { + switch (trigger->policy) { + case drop: + if (found->token != token) { + _lf_done_using(token); + } + lf_recycle_event(env, e); + return (0); + break; + case replace: + // Replace the payload of the event at the head with our + // current payload. + lf_replace_token(found, token); lf_recycle_event(env, e); return 0; + break; + default: + // Adding a microstep to the original + // intended tag. + tag.microstep++; + e->base.tag = tag; + if (lf_is_tag_after_stop_tag(env, (tag_t){.time = tag.time, .microstep = tag.microstep})) { + // Scheduling e will incur a microstep after the stop tag, + // which is illegal. + lf_recycle_event(env, e); + return 0; + } } } + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); + trigger_handle_t return_value = env->_lf_handle++; + if (env->_lf_handle < 0) { + env->_lf_handle = 1; + } + return return_value; } - pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); - trigger_handle_t return_value = env->_lf_handle++; - if (env->_lf_handle < 0) { - env->_lf_handle = 1; - } - return return_value; -} -trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t* trigger, lf_token_t* token) { - assert(env != GLOBAL_ENVIRONMENT); - // The trigger argument could be null, meaning that nothing is triggered. - // Doing this after incrementing the reference count ensures that the - // payload will be freed, if there is one. - if (trigger == NULL) { - lf_print_warning("_lf_schedule_init_reactions() called with a NULL trigger"); - _lf_done_using(token); - return 0; - } + trigger_handle_t _lf_insert_reactions_for_trigger(environment_t * env, trigger_t * trigger, lf_token_t * token) { + assert(env != GLOBAL_ENVIRONMENT); + // The trigger argument could be null, meaning that nothing is triggered. + // Doing this after incrementing the reference count ensures that the + // payload will be freed, if there is one. + if (trigger == NULL) { + lf_print_warning("_lf_schedule_init_reactions() called with a NULL trigger"); + _lf_done_using(token); + return 0; + } - // Check to see if the trigger is not a timer - // and not a physical action - if (trigger->is_timer || trigger->is_physical) { - lf_print_warning("_lf_schedule_init_reactions() called on a timer or physical action."); - return 0; - } + // Check to see if the trigger is not a timer + // and not a physical action + if (trigger->is_timer || trigger->is_physical) { + lf_print_warning("_lf_schedule_init_reactions() called on a timer or physical action."); + return 0; + } #ifdef MODAL_REACTORS - // If this trigger is associated with an inactive mode, it should not trigger any reaction. - if (!_lf_mode_is_active(trigger->mode)) { - LF_PRINT_DEBUG("Suppressing reactions of trigger due inactivity of mode %s.", trigger->mode->name); - return 1; - } + // If this trigger is associated with an inactive mode, it should not trigger any reaction. + if (!_lf_mode_is_active(trigger->mode)) { + LF_PRINT_DEBUG("Suppressing reactions of trigger due inactivity of mode %s.", trigger->mode->name); + return 1; + } #endif - // Check if the trigger has violated the STP offset - bool is_STP_violated = false; + // Check if the trigger has violated the STP offset + bool is_STP_violated = false; #ifdef FEDERATED - if (lf_tag_compare(trigger->intended_tag, env->current_tag) < 0) { - is_STP_violated = true; - } + if (lf_tag_compare(trigger->intended_tag, env->current_tag) < 0) { + is_STP_violated = true; + } #ifdef FEDERATED_CENTRALIZED - // Check for STP violation in the centralized coordination, which is a - // critical error. - if (is_STP_violated) { - lf_print_error_and_exit( - "Attempted to insert reactions for a trigger that had an intended tag that was in the past. " - "This should not happen under centralized coordination. Intended tag: " PRINTF_TAG ". Current tag: " PRINTF_TAG - ").", - trigger->intended_tag.time - lf_time_start(), trigger->intended_tag.microstep, lf_time_logical_elapsed(env), - env->current_tag.microstep); - } + // Check for STP violation in the centralized coordination, which is a + // critical error. + if (is_STP_violated) { + lf_print_error_and_exit( + "Attempted to insert reactions for a trigger that had an intended tag that was in the past. " + "This should not happen under centralized coordination. Intended tag: " PRINTF_TAG + ". Current tag: " PRINTF_TAG ").", + trigger->intended_tag.time - lf_time_start(), trigger->intended_tag.microstep, lf_time_logical_elapsed(env), + env->current_tag.microstep); + } #endif #endif - // Copy the token pointer into the trigger struct so that the - // reactions can access it. This overwrites the previous template token, - // for which we decrement the reference count. - _lf_replace_template_token((token_template_t*)trigger, token); + // Copy the token pointer into the trigger struct so that the + // reactions can access it. This overwrites the previous template token, + // for which we decrement the reference count. + _lf_replace_template_token((token_template_t*)trigger, token); - // Mark the trigger present. - trigger->status = present; + // Mark the trigger present. + trigger->status = present; - // Push the corresponding reactions for this trigger - // onto the reaction queue. - for (int i = 0; i < trigger->number_of_reactions; i++) { - reaction_t* reaction = trigger->reactions[i]; + // Push the corresponding reactions for this trigger + // onto the reaction queue. + for (int i = 0; i < trigger->number_of_reactions; i++) { + reaction_t* reaction = trigger->reactions[i]; #ifdef MODAL_REACTORS - // Check if reaction is disabled by mode inactivity - if (!_lf_mode_is_active(reaction->mode)) { - LF_PRINT_DEBUG("Suppressing reaction %s due inactivity of mode %s.", reaction->name, reaction->mode->name); - continue; // Suppress reaction by preventing entering reaction queue - } + // Check if reaction is disabled by mode inactivity + if (!_lf_mode_is_active(reaction->mode)) { + LF_PRINT_DEBUG("Suppressing reaction %s due inactivity of mode %s.", reaction->name, reaction->mode->name); + continue; // Suppress reaction by preventing entering reaction queue + } #endif - // Do not enqueue this reaction twice. - if (reaction->status == inactive) { - reaction->is_STP_violated = is_STP_violated; - _lf_trigger_reaction(env, reaction, -1); - LF_PRINT_LOG("Enqueued reaction %s at time " PRINTF_TIME ".", reaction->name, lf_time_logical(env)); + // Do not enqueue this reaction twice. + if (reaction->status == inactive) { + reaction->is_STP_violated = is_STP_violated; + _lf_trigger_reaction(env, reaction, -1); + LF_PRINT_LOG("Enqueued reaction %s at time " PRINTF_TIME ".", reaction->name, lf_time_logical(env)); + } } - } - return 1; -} + return 1; + } -void _lf_advance_tag(environment_t* env, tag_t next_tag) { - assert(env != GLOBAL_ENVIRONMENT); + void _lf_advance_tag(environment_t * env, tag_t next_tag) { + assert(env != GLOBAL_ENVIRONMENT); // FIXME: The following checks that _lf_advance_tag() // is being called correctly. Namely, check if logical time @@ -638,569 +634,569 @@ void _lf_advance_tag(environment_t* env, tag_t next_tag) { // be a need for a target property that enables these kinds of logic // assertions for development purposes only. #ifndef NDEBUG - event_t* next_event = (event_t*)pqueue_tag_peek(env->event_q); - if (next_event != NULL) { - if (lf_tag_compare(next_tag, next_event->base.tag) > 0) { - lf_print_error_and_exit("_lf_advance_tag(): Attempted to move tag to " PRINTF_TAG ", which is " - "past the head of the event queue, " PRINTF_TAG ".", - next_tag.time - start_time, next_tag.microstep, next_event->base.tag.time - start_time, - next_event->base.tag.microstep); + event_t* next_event = (event_t*)pqueue_tag_peek(env->event_q); + if (next_event != NULL) { + if (lf_tag_compare(next_tag, next_event->base.tag) > 0) { + lf_print_error_and_exit("_lf_advance_tag(): Attempted to move tag to " PRINTF_TAG ", which is " + "past the head of the event queue, " PRINTF_TAG ".", + next_tag.time - start_time, next_tag.microstep, next_event->base.tag.time - start_time, + next_event->base.tag.microstep); + } } - } #endif - if (lf_tag_compare(env->current_tag, next_tag) < 0) { - env->current_tag = next_tag; - } else { - lf_print_error_and_exit("_lf_advance_tag(): Attempted to move (elapsed) tag to " PRINTF_TAG ", which is " - "earlier than or equal to the (elapsed) current tag, " PRINTF_TAG ".", - next_tag.time - start_time, next_tag.microstep, env->current_tag.time - start_time, - env->current_tag.microstep); + if (lf_tag_compare(env->current_tag, next_tag) < 0) { + env->current_tag = next_tag; + } else { + lf_print_error_and_exit("_lf_advance_tag(): Attempted to move (elapsed) tag to " PRINTF_TAG ", which is " + "earlier than or equal to the (elapsed) current tag, " PRINTF_TAG ".", + next_tag.time - start_time, next_tag.microstep, env->current_tag.time - start_time, + env->current_tag.microstep); + } + LF_PRINT_LOG("Advanced (elapsed) tag to " PRINTF_TAG " at physical time " PRINTF_TIME, next_tag.time - start_time, + env->current_tag.microstep, lf_time_physical_elapsed()); } - LF_PRINT_LOG("Advanced (elapsed) tag to " PRINTF_TAG " at physical time " PRINTF_TIME, next_tag.time - start_time, - env->current_tag.microstep, lf_time_physical_elapsed()); -} -/** + /** - * Invoke the given reaction - * - * @param env Environment in which we are executing. - * @param reaction The reaction that has just executed. - * @param worker The thread number of the worker thread or 0 for single-threaded execution (for tracing). - */ -void _lf_invoke_reaction(environment_t* env, reaction_t* reaction, int worker) { - assert(env != GLOBAL_ENVIRONMENT); + * Invoke the given reaction + * + * @param env Environment in which we are executing. + * @param reaction The reaction that has just executed. + * @param worker The thread number of the worker thread or 0 for single-threaded execution (for tracing). + */ + void _lf_invoke_reaction(environment_t * env, reaction_t * reaction, int worker) { + assert(env != GLOBAL_ENVIRONMENT); #if !defined(LF_SINGLE_THREADED) - if (((self_base_t*)reaction->self)->reactor_mutex != NULL) { - LF_MUTEX_LOCK((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); - } + if (((self_base_t*)reaction->self)->reactor_mutex != NULL) { + LF_MUTEX_LOCK((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); + } #endif - tracepoint_reaction_starts(env, reaction, worker); - ((self_base_t*)reaction->self)->executing_reaction = reaction; - reaction->function(reaction->self); - ((self_base_t*)reaction->self)->executing_reaction = NULL; - tracepoint_reaction_ends(env, reaction, worker); + tracepoint_reaction_starts(env, reaction, worker); + ((self_base_t*)reaction->self)->executing_reaction = reaction; + reaction->function(reaction->self); + ((self_base_t*)reaction->self)->executing_reaction = NULL; + tracepoint_reaction_ends(env, reaction, worker); #if !defined(LF_SINGLE_THREADED) - if (((self_base_t*)reaction->self)->reactor_mutex != NULL) { - LF_MUTEX_UNLOCK((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); - } + if (((self_base_t*)reaction->self)->reactor_mutex != NULL) { + LF_MUTEX_UNLOCK((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); + } #endif -} - -/** - * For the specified reaction, if it has produced outputs, insert the - * resulting triggered reactions into the reaction queue. - * This procedure assumes the mutex lock is NOT held and grabs - * the lock only when it actually inserts something onto the reaction queue. - * @param env Environment in which we are executing. - * @param reaction The reaction that has just executed. - * @param worker The thread number of the worker thread or 0 for single-threaded execution (for tracing). - */ -void schedule_output_reactions(environment_t* env, reaction_t* reaction, int worker) { - assert(env != GLOBAL_ENVIRONMENT); + } - // If the reaction produced outputs, put the resulting triggered - // reactions into the reaction queue. As an optimization, if exactly one - // downstream reaction is enabled by this reaction, then it may be - // executed immediately in this same thread - // without going through the reaction queue. - reaction_t* downstream_to_execute_now = NULL; - int num_downstream_reactions = 0; + /** + * For the specified reaction, if it has produced outputs, insert the + * resulting triggered reactions into the reaction queue. + * This procedure assumes the mutex lock is NOT held and grabs + * the lock only when it actually inserts something onto the reaction queue. + * @param env Environment in which we are executing. + * @param reaction The reaction that has just executed. + * @param worker The thread number of the worker thread or 0 for single-threaded execution (for tracing). + */ + void schedule_output_reactions(environment_t * env, reaction_t * reaction, int worker) { + assert(env != GLOBAL_ENVIRONMENT); + + // If the reaction produced outputs, put the resulting triggered + // reactions into the reaction queue. As an optimization, if exactly one + // downstream reaction is enabled by this reaction, then it may be + // executed immediately in this same thread + // without going through the reaction queue. + reaction_t* downstream_to_execute_now = NULL; + int num_downstream_reactions = 0; #ifdef FEDERATED_DECENTRALIZED // Only pass down STP violation for federated programs that use decentralized // coordination. - // Extract the inherited STP violation - bool inherited_STP_violation = reaction->is_STP_violated; - LF_PRINT_DEBUG("Reaction %s has STP violation status: %d.", reaction->name, reaction->is_STP_violated); + // Extract the inherited STP violation + bool inherited_STP_violation = reaction->is_STP_violated; + LF_PRINT_DEBUG("Reaction %s has STP violation status: %d.", reaction->name, reaction->is_STP_violated); #endif - LF_PRINT_DEBUG("There are %zu outputs from reaction %s.", reaction->num_outputs, reaction->name); - for (size_t i = 0; i < reaction->num_outputs; i++) { - if (reaction->output_produced[i] != NULL && *(reaction->output_produced[i])) { - LF_PRINT_DEBUG("Output %zu has been produced.", i); - trigger_t** triggerArray = (reaction->triggers)[i]; - LF_PRINT_DEBUG("There are %d trigger arrays associated with output %zu.", reaction->triggered_sizes[i], i); - for (int j = 0; j < reaction->triggered_sizes[i]; j++) { - trigger_t* trigger = triggerArray[j]; - if (trigger != NULL) { - LF_PRINT_DEBUG("Trigger %p lists %d reactions.", (void*)trigger, trigger->number_of_reactions); - for (int k = 0; k < trigger->number_of_reactions; k++) { - reaction_t* downstream_reaction = trigger->reactions[k]; + LF_PRINT_DEBUG("There are %zu outputs from reaction %s.", reaction->num_outputs, reaction->name); + for (size_t i = 0; i < reaction->num_outputs; i++) { + if (reaction->output_produced[i] != NULL && *(reaction->output_produced[i])) { + LF_PRINT_DEBUG("Output %zu has been produced.", i); + trigger_t** triggerArray = (reaction->triggers)[i]; + LF_PRINT_DEBUG("There are %d trigger arrays associated with output %zu.", reaction->triggered_sizes[i], i); + for (int j = 0; j < reaction->triggered_sizes[i]; j++) { + trigger_t* trigger = triggerArray[j]; + if (trigger != NULL) { + LF_PRINT_DEBUG("Trigger %p lists %d reactions.", (void*)trigger, trigger->number_of_reactions); + for (int k = 0; k < trigger->number_of_reactions; k++) { + reaction_t* downstream_reaction = trigger->reactions[k]; #ifdef FEDERATED_DECENTRALIZED // Only pass down tardiness for federated LF programs - // Set the is_STP_violated for the downstream reaction - if (downstream_reaction != NULL) { - downstream_reaction->is_STP_violated = inherited_STP_violation; - LF_PRINT_DEBUG("Passing is_STP_violated of %d to the downstream reaction: %s", - downstream_reaction->is_STP_violated, downstream_reaction->name); - } + // Set the is_STP_violated for the downstream reaction + if (downstream_reaction != NULL) { + downstream_reaction->is_STP_violated = inherited_STP_violation; + LF_PRINT_DEBUG("Passing is_STP_violated of %d to the downstream reaction: %s", + downstream_reaction->is_STP_violated, downstream_reaction->name); + } #endif - if (downstream_reaction != NULL && downstream_reaction != downstream_to_execute_now) { - num_downstream_reactions++; - // If there is exactly one downstream reaction that is enabled by this - // reaction, then we can execute that reaction immediately without - // going through the reaction queue. In multithreaded execution, this - // avoids acquiring a mutex lock. - // FIXME: Check the earliest deadline on the reaction queue. - // This optimization could violate EDF scheduling otherwise. - if (num_downstream_reactions == 1 && downstream_reaction->last_enabling_reaction == reaction) { - // So far, this downstream reaction is a candidate to execute now. - downstream_to_execute_now = downstream_reaction; - } else { - // If there is a previous candidate reaction to execute now, - // it is no longer a candidate. - if (downstream_to_execute_now != NULL) { - // More than one downstream reaction is enabled. - // In this case, if we were to execute the downstream reaction - // immediately without changing any queues, then the second - // downstream reaction would be blocked because this reaction - // remains on the executing queue. Hence, the optimization - // is not valid. Put the candidate reaction on the queue. - _lf_trigger_reaction(env, downstream_to_execute_now, worker); - downstream_to_execute_now = NULL; + if (downstream_reaction != NULL && downstream_reaction != downstream_to_execute_now) { + num_downstream_reactions++; + // If there is exactly one downstream reaction that is enabled by this + // reaction, then we can execute that reaction immediately without + // going through the reaction queue. In multithreaded execution, this + // avoids acquiring a mutex lock. + // FIXME: Check the earliest deadline on the reaction queue. + // This optimization could violate EDF scheduling otherwise. + if (num_downstream_reactions == 1 && downstream_reaction->last_enabling_reaction == reaction) { + // So far, this downstream reaction is a candidate to execute now. + downstream_to_execute_now = downstream_reaction; + } else { + // If there is a previous candidate reaction to execute now, + // it is no longer a candidate. + if (downstream_to_execute_now != NULL) { + // More than one downstream reaction is enabled. + // In this case, if we were to execute the downstream reaction + // immediately without changing any queues, then the second + // downstream reaction would be blocked because this reaction + // remains on the executing queue. Hence, the optimization + // is not valid. Put the candidate reaction on the queue. + _lf_trigger_reaction(env, downstream_to_execute_now, worker); + downstream_to_execute_now = NULL; + } + // Queue the reaction. + _lf_trigger_reaction(env, downstream_reaction, worker); } - // Queue the reaction. - _lf_trigger_reaction(env, downstream_reaction, worker); } } } } } } - } - if (downstream_to_execute_now != NULL) { - LF_PRINT_LOG("Worker %d: Optimizing and executing downstream reaction now: %s", worker, - downstream_to_execute_now->name); - bool violation = false; + if (downstream_to_execute_now != NULL) { + LF_PRINT_LOG("Worker %d: Optimizing and executing downstream reaction now: %s", worker, + downstream_to_execute_now->name); + bool violation = false; #ifdef FEDERATED_DECENTRALIZED // Only use the STP handler for federated programs that use decentralized coordination - // If the is_STP_violated for the reaction is true, - // an input trigger to this reaction has been triggered at a later - // logical time than originally anticipated. In this case, a special - // STP handler will be invoked. - // FIXME: Note that the STP handler will be invoked - // at most once per logical time value. If the STP handler triggers the - // same reaction at the current time value, even if at a future superdense time, - // then the reaction will be invoked and the STP handler will not be invoked again. - // However, input ports to a federate reactor are network port types so this possibly should - // be disallowed. - // @note The STP handler and the deadline handler are not mutually exclusive. - // In other words, both can be invoked for a reaction if it is triggered late - // in logical time (STP offset is violated) and also misses the constraint on - // physical time (deadline). - // @note In absence of a STP handler, the is_STP_violated will be passed down the reaction - // chain until it is dealt with in a downstream STP handler. - if (downstream_to_execute_now->is_STP_violated == true) { - // Tardiness has occurred - LF_PRINT_LOG("Event has STP violation."); - reaction_function_t handler = downstream_to_execute_now->STP_handler; - // Invoke the STP handler if there is one. - if (handler != NULL) { - // There is a violation and it is being handled here - // If there is no STP handler, pass the is_STP_violated - // to downstream reactions. - violation = true; - LF_PRINT_LOG("Invoke tardiness handler."); - (*handler)(downstream_to_execute_now->self); - - // If the reaction produced outputs, put the resulting - // triggered reactions into the queue or execute them directly if possible. - schedule_output_reactions(env, downstream_to_execute_now, worker); - - // Reset the tardiness because it has been dealt with in the - // STP handler - downstream_to_execute_now->is_STP_violated = false; - LF_PRINT_DEBUG("Reset reaction's is_STP_violated field to false: %s", downstream_to_execute_now->name); - } - } -#endif - if (downstream_to_execute_now->deadline >= 0LL) { - // Get the current physical time. - instant_t physical_time = lf_time_physical(); - // Check for deadline violation. - if (downstream_to_execute_now->deadline == 0 || - physical_time > env->current_tag.time + downstream_to_execute_now->deadline) { - // Deadline violation has occurred. - tracepoint_reaction_deadline_missed(env, downstream_to_execute_now, worker); - violation = true; - // Invoke the local handler, if there is one. - reaction_function_t handler = downstream_to_execute_now->deadline_violation_handler; + // If the is_STP_violated for the reaction is true, + // an input trigger to this reaction has been triggered at a later + // logical time than originally anticipated. In this case, a special + // STP handler will be invoked. + // FIXME: Note that the STP handler will be invoked + // at most once per logical time value. If the STP handler triggers the + // same reaction at the current time value, even if at a future superdense time, + // then the reaction will be invoked and the STP handler will not be invoked again. + // However, input ports to a federate reactor are network port types so this possibly should + // be disallowed. + // @note The STP handler and the deadline handler are not mutually exclusive. + // In other words, both can be invoked for a reaction if it is triggered late + // in logical time (STP offset is violated) and also misses the constraint on + // physical time (deadline). + // @note In absence of a STP handler, the is_STP_violated will be passed down the reaction + // chain until it is dealt with in a downstream STP handler. + if (downstream_to_execute_now->is_STP_violated == true) { + // Tardiness has occurred + LF_PRINT_LOG("Event has STP violation."); + reaction_function_t handler = downstream_to_execute_now->STP_handler; + // Invoke the STP handler if there is one. if (handler != NULL) { - // Assume the mutex is still not held. + // There is a violation and it is being handled here + // If there is no STP handler, pass the is_STP_violated + // to downstream reactions. + violation = true; + LF_PRINT_LOG("Invoke tardiness handler."); (*handler)(downstream_to_execute_now->self); // If the reaction produced outputs, put the resulting // triggered reactions into the queue or execute them directly if possible. schedule_output_reactions(env, downstream_to_execute_now, worker); + + // Reset the tardiness because it has been dealt with in the + // STP handler + downstream_to_execute_now->is_STP_violated = false; + LF_PRINT_DEBUG("Reset reaction's is_STP_violated field to false: %s", downstream_to_execute_now->name); } } - } - if (!violation) { - // Invoke the downstream_reaction function. - _lf_invoke_reaction(env, downstream_to_execute_now, worker); +#endif + if (downstream_to_execute_now->deadline >= 0LL) { + // Get the current physical time. + instant_t physical_time = lf_time_physical(); + // Check for deadline violation. + if (downstream_to_execute_now->deadline == 0 || + physical_time > env->current_tag.time + downstream_to_execute_now->deadline) { + // Deadline violation has occurred. + tracepoint_reaction_deadline_missed(env, downstream_to_execute_now, worker); + violation = true; + // Invoke the local handler, if there is one. + reaction_function_t handler = downstream_to_execute_now->deadline_violation_handler; + if (handler != NULL) { + // Assume the mutex is still not held. + (*handler)(downstream_to_execute_now->self); + + // If the reaction produced outputs, put the resulting + // triggered reactions into the queue or execute them directly if possible. + schedule_output_reactions(env, downstream_to_execute_now, worker); + } + } + } + if (!violation) { + // Invoke the downstream_reaction function. + _lf_invoke_reaction(env, downstream_to_execute_now, worker); - // If the downstream_reaction produced outputs, put the resulting triggered - // reactions into the queue (or execute them directly, if possible). - schedule_output_reactions(env, downstream_to_execute_now, worker); - } + // If the downstream_reaction produced outputs, put the resulting triggered + // reactions into the queue (or execute them directly, if possible). + schedule_output_reactions(env, downstream_to_execute_now, worker); + } - // Reset the is_STP_violated because it has been passed - // down the chain - downstream_to_execute_now->is_STP_violated = false; - LF_PRINT_DEBUG("Finally, reset reaction's is_STP_violated field to false: %s", downstream_to_execute_now->name); + // Reset the is_STP_violated because it has been passed + // down the chain + downstream_to_execute_now->is_STP_violated = false; + LF_PRINT_DEBUG("Finally, reset reaction's is_STP_violated field to false: %s", downstream_to_execute_now->name); + } } -} -/** - * Print a usage message. - * TODO: This is not necessary for NO_TTY - */ -void usage(int argc, const char* argv[]) { - printf("\nCommand-line arguments: \n\n"); - printf(" -f, --fast [true | false]\n"); - printf(" Whether to wait for physical time to match logical time.\n\n"); - printf(" -o, --timeout \n"); - printf(" Stop after the specified amount of logical time, where units are one of\n"); - printf(" nsec, usec, msec, sec, minute, hour, day, week, or the plurals of those.\n\n"); - printf(" -k, --keepalive\n"); - printf(" Whether continue execution even when there are no events to process.\n\n"); - printf(" -w, --workers \n"); - printf(" Executed in threads if possible (optional feature).\n\n"); - printf(" -i, --id \n"); - printf(" The ID of the federation that this reactor will join.\n\n"); + /** + * Print a usage message. + * TODO: This is not necessary for NO_TTY + */ + void usage(int argc, const char* argv[]) { + printf("\nCommand-line arguments: \n\n"); + printf(" -f, --fast [true | false]\n"); + printf(" Whether to wait for physical time to match logical time.\n\n"); + printf(" -o, --timeout \n"); + printf(" Stop after the specified amount of logical time, where units are one of\n"); + printf(" nsec, usec, msec, sec, minute, hour, day, week, or the plurals of those.\n\n"); + printf(" -k, --keepalive\n"); + printf(" Whether continue execution even when there are no events to process.\n\n"); + printf(" -w, --workers \n"); + printf(" Executed in threads if possible (optional feature).\n\n"); + printf(" -i, --id \n"); + printf(" The ID of the federation that this reactor will join.\n\n"); #ifdef FEDERATED - printf(" -r, --rti \n"); - printf(" The address of the RTI, which can be in the form of user@host:port or ip:port.\n\n"); - printf(" -l\n"); - printf(" Send stdout to individual log files for each federate.\n\n"); + printf(" -r, --rti \n"); + printf(" The address of the RTI, which can be in the form of user@host:port or ip:port.\n\n"); + printf(" -l\n"); + printf(" Send stdout to individual log files for each federate.\n\n"); #endif - printf("Command given:\n"); - for (int i = 0; i < argc; i++) { - printf("%s ", argv[i]); + printf("Command given:\n"); + for (int i = 0; i < argc; i++) { + printf("%s ", argv[i]); + } + printf("\n\n"); } - printf("\n\n"); -} -// Some options given in the target directive are provided here as -// default command-line options. -int default_argc = 0; -const char** default_argv = NULL; - -/** - * Process the command-line arguments. If the command line arguments are not - * understood, then print a usage message and return 0. Otherwise, return 1. - * @return 1 if the arguments processed successfully, 0 otherwise. - * TODO: Not necessary for NO_TTY - */ -int process_args(int argc, const char* argv[]) { - int i = 1; - while (i < argc) { - const char* arg = argv[i++]; - if (strcmp(arg, "-f") == 0 || strcmp(arg, "--fast") == 0) { - if (argc < i + 1) { - lf_print_error("--fast needs a boolean."); - usage(argc, argv); - return 0; - } - const char* fast_spec = argv[i++]; - if (strcmp(fast_spec, "true") == 0) { - fast = true; - } else if (strcmp(fast_spec, "false") == 0) { - fast = false; - } else { - lf_print_error("Invalid value for --fast: %s", fast_spec); - } - } else if (strcmp(arg, "-o") == 0 || strcmp(arg, "--timeout") == 0 || strcmp(arg, "-timeout") == 0) { - // Tolerate -timeout for legacy uses. - if (argc < i + 2) { - lf_print_error("--timeout needs time and units."); - usage(argc, argv); - return 0; - } - const char* time_spec = argv[i++]; - const char* units = argv[i++]; + // Some options given in the target directive are provided here as + // default command-line options. + int default_argc = 0; + const char** default_argv = NULL; + + /** + * Process the command-line arguments. If the command line arguments are not + * understood, then print a usage message and return 0. Otherwise, return 1. + * @return 1 if the arguments processed successfully, 0 otherwise. + * TODO: Not necessary for NO_TTY + */ + int process_args(int argc, const char* argv[]) { + int i = 1; + while (i < argc) { + const char* arg = argv[i++]; + if (strcmp(arg, "-f") == 0 || strcmp(arg, "--fast") == 0) { + if (argc < i + 1) { + lf_print_error("--fast needs a boolean."); + usage(argc, argv); + return 0; + } + const char* fast_spec = argv[i++]; + if (strcmp(fast_spec, "true") == 0) { + fast = true; + } else if (strcmp(fast_spec, "false") == 0) { + fast = false; + } else { + lf_print_error("Invalid value for --fast: %s", fast_spec); + } + } else if (strcmp(arg, "-o") == 0 || strcmp(arg, "--timeout") == 0 || strcmp(arg, "-timeout") == 0) { + // Tolerate -timeout for legacy uses. + if (argc < i + 2) { + lf_print_error("--timeout needs time and units."); + usage(argc, argv); + return 0; + } + const char* time_spec = argv[i++]; + const char* units = argv[i++]; #if defined(PLATFORM_ARDUINO) - duration = atol(time_spec); + duration = atol(time_spec); #else - duration = atoll(time_spec); + duration = atoll(time_spec); #endif - // A parse error returns 0LL, so check to see whether that is what is meant. - if (duration == 0LL && strncmp(time_spec, "0", 1) != 0) { - // Parse error. - lf_print_error("Invalid time value: %s", time_spec); - usage(argc, argv); - return 0; - } - if (strncmp(units, "sec", 3) == 0) { - duration = SEC(duration); - } else if (strncmp(units, "msec", 4) == 0) { - duration = MSEC(duration); - } else if (strncmp(units, "usec", 4) == 0) { - duration = USEC(duration); - } else if (strncmp(units, "nsec", 4) == 0) { - duration = NSEC(duration); - } else if (strncmp(units, "min", 3) == 0) { - duration = MINUTE(duration); - } else if (strncmp(units, "hour", 4) == 0) { - duration = HOUR(duration); - } else if (strncmp(units, "day", 3) == 0) { - duration = DAY(duration); - } else if (strncmp(units, "week", 4) == 0) { - duration = WEEK(duration); - } else { - // Invalid units. - lf_print_error("Invalid time units: %s", units); - usage(argc, argv); - return 0; - } - } else if (strcmp(arg, "-k") == 0 || strcmp(arg, "--keepalive") == 0) { - if (argc < i + 1) { - lf_print_error("--keepalive needs a boolean."); - usage(argc, argv); - return 0; - } - const char* keep_spec = argv[i++]; - if (strcmp(keep_spec, "true") == 0) { - keepalive_specified = true; - } else if (strcmp(keep_spec, "false") == 0) { - keepalive_specified = false; - } else { - lf_print_error("Invalid value for --keepalive: %s", keep_spec); - } - } else if (strcmp(arg, "-w") == 0 || strcmp(arg, "--workers") == 0) { - if (argc < i + 1) { - lf_print_error("--workers needs an integer argument.s"); - usage(argc, argv); - return 0; - } - const char* threads_spec = argv[i++]; - int num_workers = atoi(threads_spec); - if (num_workers <= 0) { - lf_print_error("Invalid value for --workers: %s. Using 1.", threads_spec); - num_workers = 1; + // A parse error returns 0LL, so check to see whether that is what is meant. + if (duration == 0LL && strncmp(time_spec, "0", 1) != 0) { + // Parse error. + lf_print_error("Invalid time value: %s", time_spec); + usage(argc, argv); + return 0; + } + if (strncmp(units, "sec", 3) == 0) { + duration = SEC(duration); + } else if (strncmp(units, "msec", 4) == 0) { + duration = MSEC(duration); + } else if (strncmp(units, "usec", 4) == 0) { + duration = USEC(duration); + } else if (strncmp(units, "nsec", 4) == 0) { + duration = NSEC(duration); + } else if (strncmp(units, "min", 3) == 0) { + duration = MINUTE(duration); + } else if (strncmp(units, "hour", 4) == 0) { + duration = HOUR(duration); + } else if (strncmp(units, "day", 3) == 0) { + duration = DAY(duration); + } else if (strncmp(units, "week", 4) == 0) { + duration = WEEK(duration); + } else { + // Invalid units. + lf_print_error("Invalid time units: %s", units); + usage(argc, argv); + return 0; + } + } else if (strcmp(arg, "-k") == 0 || strcmp(arg, "--keepalive") == 0) { + if (argc < i + 1) { + lf_print_error("--keepalive needs a boolean."); + usage(argc, argv); + return 0; + } + const char* keep_spec = argv[i++]; + if (strcmp(keep_spec, "true") == 0) { + keepalive_specified = true; + } else if (strcmp(keep_spec, "false") == 0) { + keepalive_specified = false; + } else { + lf_print_error("Invalid value for --keepalive: %s", keep_spec); + } + } else if (strcmp(arg, "-w") == 0 || strcmp(arg, "--workers") == 0) { + if (argc < i + 1) { + lf_print_error("--workers needs an integer argument.s"); + usage(argc, argv); + return 0; + } + const char* threads_spec = argv[i++]; + int num_workers = atoi(threads_spec); + if (num_workers <= 0) { + lf_print_error("Invalid value for --workers: %s. Using 1.", threads_spec); + num_workers = 1; + } + _lf_number_of_workers = (unsigned int)num_workers; } - _lf_number_of_workers = (unsigned int)num_workers; - } #ifdef FEDERATED - else if (strcmp(arg, "-i") == 0 || strcmp(arg, "--id") == 0) { - if (argc < i + 1) { - lf_print_error("--id needs a string argument."); - usage(argc, argv); - return 0; - } - const char* fid = argv[i++]; - lf_set_federation_id(fid); - lf_print("Federation ID for executable %s: %s", argv[0], fid); - } else if (strcmp(arg, "-r") == 0 || strcmp(arg, "--rti") == 0) { - if (argc < i + 1) { - lf_print_error("--rti needs a string argument in the form of [user]@[host]:[port]."); - usage(argc, argv); - return 0; - } - parse_rti_code_t code = lf_parse_rti_addr(argv[i++]); - if (code != SUCCESS) { - switch (code) { - case INVALID_HOST: - lf_print_error("--rti needs a valid host"); - break; - case INVALID_PORT: - lf_print_error("--rti needs a valid port"); - break; - case INVALID_USER: - lf_print_error("--rti needs a valid user"); - break; - case FAILED_TO_PARSE: - lf_print_error("Failed to parse address of RTI"); - break; - default: - break; + else if (strcmp(arg, "-i") == 0 || strcmp(arg, "--id") == 0) { + if (argc < i + 1) { + lf_print_error("--id needs a string argument."); + usage(argc, argv); + return 0; + } + const char* fid = argv[i++]; + lf_set_federation_id(fid); + lf_print("Federation ID for executable %s: %s", argv[0], fid); + } else if (strcmp(arg, "-r") == 0 || strcmp(arg, "--rti") == 0) { + if (argc < i + 1) { + lf_print_error("--rti needs a string argument in the form of [user]@[host]:[port]."); + usage(argc, argv); + return 0; + } + parse_rti_code_t code = lf_parse_rti_addr(argv[i++]); + if (code != SUCCESS) { + switch (code) { + case INVALID_HOST: + lf_print_error("--rti needs a valid host"); + break; + case INVALID_PORT: + lf_print_error("--rti needs a valid port"); + break; + case INVALID_USER: + lf_print_error("--rti needs a valid user"); + break; + case FAILED_TO_PARSE: + lf_print_error("Failed to parse address of RTI"); + break; + default: + break; + } + usage(argc, argv); + return 0; } + } +#endif + else if (strcmp(arg, "--ros-args") == 0) { + // FIXME: Ignore ROS arguments for now + } else { + lf_print_error("Unrecognized command-line argument: %s", arg); usage(argc, argv); return 0; } } -#endif - else if (strcmp(arg, "--ros-args") == 0) { - // FIXME: Ignore ROS arguments for now - } else { - lf_print_error("Unrecognized command-line argument: %s", arg); - usage(argc, argv); - return 0; - } + return 1; } - return 1; -} /** * @brief Check that the provided version information is consistent with the * core runtime. */ #ifdef LF_TRACE -static void check_version(version_t version) { + static void check_version(version_t version) { #ifdef LF_SINGLE_THREADED - LF_ASSERT(version.build_config.single_threaded == TRIBOOL_TRUE || - version.build_config.single_threaded == TRIBOOL_DOES_NOT_MATTER, - "expected single-threaded version"); + LF_ASSERT(version.build_config.single_threaded == TRIBOOL_TRUE || + version.build_config.single_threaded == TRIBOOL_DOES_NOT_MATTER, + "expected single-threaded version"); #else - LF_ASSERT(version.build_config.single_threaded == TRIBOOL_FALSE || - version.build_config.single_threaded == TRIBOOL_DOES_NOT_MATTER, - "expected multi-threaded version"); + LF_ASSERT(version.build_config.single_threaded == TRIBOOL_FALSE || + version.build_config.single_threaded == TRIBOOL_DOES_NOT_MATTER, + "expected multi-threaded version"); #endif #ifdef NDEBUG - LF_ASSERT(version.build_config.build_type_is_debug == TRIBOOL_FALSE || - version.build_config.build_type_is_debug == TRIBOOL_DOES_NOT_MATTER, - "expected release version"); + LF_ASSERT(version.build_config.build_type_is_debug == TRIBOOL_FALSE || + version.build_config.build_type_is_debug == TRIBOOL_DOES_NOT_MATTER, + "expected release version"); #else - LF_ASSERT(version.build_config.build_type_is_debug == TRIBOOL_TRUE || - version.build_config.build_type_is_debug == TRIBOOL_DOES_NOT_MATTER, - "expected debug version"); + LF_ASSERT(version.build_config.build_type_is_debug == TRIBOOL_TRUE || + version.build_config.build_type_is_debug == TRIBOOL_DOES_NOT_MATTER, + "expected debug version"); #endif - LF_ASSERT(version.build_config.log_level == LOG_LEVEL || version.build_config.log_level == INT_MAX, - "expected log level %d", LOG_LEVEL); - // assert(!version.core_version_name || strcmp(version.core_version_name, CORE_SHA) == 0); // TODO: provide CORE_SHA -} + LF_ASSERT(version.build_config.log_level == LOG_LEVEL || version.build_config.log_level == INT_MAX, + "expected log level %d", LOG_LEVEL); + // assert(!version.core_version_name || strcmp(version.core_version_name, CORE_SHA) == 0); // TODO: provide CORE_SHA + } #endif // LF_TRACE -void initialize_global(void) { + void initialize_global(void) { #ifdef LF_TRACE - check_version(lf_version_tracing()); + check_version(lf_version_tracing()); #endif #if !defined NDEBUG - _lf_count_payload_allocations = 0; - _lf_count_token_allocations = 0; + _lf_count_payload_allocations = 0; + _lf_count_token_allocations = 0; #endif #if defined(LF_SINGLE_THREADED) - int max_threads_tracing = 1; + int max_threads_tracing = 1; #else - environment_t* envs; - int num_envs = _lf_get_environments(&envs); - int max_threads_tracing = envs[0].num_workers * num_envs + 1; // add 1 for the main thread + environment_t* envs; + int num_envs = _lf_get_environments(&envs); + int max_threads_tracing = envs[0].num_workers * num_envs + 1; // add 1 for the main thread #endif #if defined(FEDERATED) - // NUMBER_OF_FEDERATES is an upper bound on the number of upstream federates - // -- threads are spawned to listen to upstream federates. Add 1 for the - // clock sync thread and add 1 for the staa thread - max_threads_tracing += NUMBER_OF_FEDERATES + 2; - lf_tracing_global_init("federate__", FEDERATE_ID, max_threads_tracing); + // NUMBER_OF_FEDERATES is an upper bound on the number of upstream federates + // -- threads are spawned to listen to upstream federates. Add 1 for the + // clock sync thread and add 1 for the staa thread + max_threads_tracing += NUMBER_OF_FEDERATES + 2; + lf_tracing_global_init("federate__", FEDERATE_ID, max_threads_tracing); #else - lf_tracing_global_init("trace_", 0, max_threads_tracing); + lf_tracing_global_init("trace_", 0, max_threads_tracing); #endif - // Call the code-generated function to initialize all actions, timers, and ports - // This is done for all environments/enclaves at the same time. - _lf_initialize_trigger_objects(); -} + // Call the code-generated function to initialize all actions, timers, and ports + // This is done for all environments/enclaves at the same time. + _lf_initialize_trigger_objects(); + } -/** - * Flag to prevent termination function from executing twice and to signal to background - * threads to terminate. - */ -bool _lf_termination_executed = false; + /** + * Flag to prevent termination function from executing twice and to signal to background + * threads to terminate. + */ + bool _lf_termination_executed = false; + + /** Flag used to disable cleanup operations on abnormal termination. */ + bool _lf_normal_termination = false; + + /** + * Report elapsed logical and physical times and report if any + * memory allocated for tokens has not been freed. + */ + void termination(void) { + if (_lf_termination_executed) + return; + _lf_termination_executed = true; -/** Flag used to disable cleanup operations on abnormal termination. */ -bool _lf_normal_termination = false; + environment_t* env; + int num_envs = _lf_get_environments(&env); + // Invoke the code generated termination function. It terminates the federated related services. + // It should only be called for the top-level environment, which, by convention, is the first environment. + lf_terminate_execution(env); -/** - * Report elapsed logical and physical times and report if any - * memory allocated for tokens has not been freed. - */ -void termination(void) { - if (_lf_termination_executed) - return; - _lf_termination_executed = true; - - environment_t* env; - int num_envs = _lf_get_environments(&env); - // Invoke the code generated termination function. It terminates the federated related services. - // It should only be called for the top-level environment, which, by convention, is the first environment. - lf_terminate_execution(env); - - // In order to free tokens, we perform the same actions we would have for a new time step. - for (int i = 0; i < num_envs; i++) { - if (!env[i].initialized) { - lf_print_warning("---- Environment %u was never initialized", env[i].id); - continue; - } - LF_PRINT_LOG("---- Terminating environment %u, normal termination: %d", env[i].id, _lf_normal_termination); + // In order to free tokens, we perform the same actions we would have for a new time step. + for (int i = 0; i < num_envs; i++) { + if (!env[i].initialized) { + lf_print_warning("---- Environment %u was never initialized", env[i].id); + continue; + } + LF_PRINT_LOG("---- Terminating environment %u, normal termination: %d", env[i].id, _lf_normal_termination); #if !defined(LF_SINGLE_THREADED) - // Make sure all watchdog threads have stopped - _lf_watchdog_terminate_all(&env[i]); + // Make sure all watchdog threads have stopped + _lf_watchdog_terminate_all(&env[i]); #endif - // Skip most cleanup on abnormal termination. - if (_lf_normal_termination) { - _lf_start_time_step(&env[i]); + // Skip most cleanup on abnormal termination. + if (_lf_normal_termination) { + _lf_start_time_step(&env[i]); #ifdef MODAL_REACTORS - // Free events and tokens suspended by modal reactors. - _lf_terminate_modal_reactors(&env[i]); + // Free events and tokens suspended by modal reactors. + _lf_terminate_modal_reactors(&env[i]); #endif - // If the event queue still has events on it, report that. - if (env[i].event_q != NULL && pqueue_tag_size(env[i].event_q) > 0) { - lf_print_warning("---- There are %zu unprocessed future events on the event queue.", - pqueue_tag_size(env[i].event_q)); - event_t* event = (event_t*)pqueue_tag_peek(env[i].event_q); - lf_print_warning("---- The first future event has timestamp " PRINTF_TAG " after start tag.", - event->base.tag.time - start_time, event->base.tag.microstep); - } - // Print elapsed times. - // If these are negative, then the program failed to start up. - interval_t elapsed_time = lf_time_logical_elapsed(&env[i]); - if (elapsed_time >= 0LL) { - char time_buffer[29]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 - lf_comma_separated_time(time_buffer, elapsed_time); - printf("---- Elapsed logical time (in nsec): %s\n", time_buffer); - - // If start_time is 0, then execution didn't get far enough along - // to initialize this. - if (start_time > 0LL) { - lf_comma_separated_time(time_buffer, lf_time_physical_elapsed()); - printf("---- Elapsed physical time (in nsec): %s\n", time_buffer); + // If the event queue still has events on it, report that. + if (env[i].event_q != NULL && pqueue_tag_size(env[i].event_q) > 0) { + lf_print_warning("---- There are %zu unprocessed future events on the event queue.", + pqueue_tag_size(env[i].event_q)); + event_t* event = (event_t*)pqueue_tag_peek(env[i].event_q); + lf_print_warning("---- The first future event has timestamp " PRINTF_TAG " after start tag.", + event->base.tag.time - start_time, event->base.tag.microstep); + } + // Print elapsed times. + // If these are negative, then the program failed to start up. + interval_t elapsed_time = lf_time_logical_elapsed(&env[i]); + if (elapsed_time >= 0LL) { + char time_buffer[29]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, elapsed_time); + printf("---- Elapsed logical time (in nsec): %s\n", time_buffer); + + // If start_time is 0, then execution didn't get far enough along + // to initialize this. + if (start_time > 0LL) { + lf_comma_separated_time(time_buffer, lf_time_physical_elapsed()); + printf("---- Elapsed physical time (in nsec): %s\n", time_buffer); + } } } } - } - // Skip most cleanup on abnormal termination. - if (_lf_normal_termination) { - _lf_free_all_tokens(); // Must be done before freeing reactors. + // Skip most cleanup on abnormal termination. + if (_lf_normal_termination) { + _lf_free_all_tokens(); // Must be done before freeing reactors. #if !defined NDEBUG - // Issue a warning if a memory leak has been detected. - if (_lf_count_payload_allocations > 0) { - lf_print_warning("Memory allocated for messages has not been freed."); - lf_print_warning("Number of unfreed messages: %d.", _lf_count_payload_allocations); - } - if (_lf_count_token_allocations > 0) { - lf_print_warning("Memory allocated for tokens has not been freed!"); - lf_print_warning("Number of unfreed tokens: %d.", _lf_count_token_allocations); - } + // Issue a warning if a memory leak has been detected. + if (_lf_count_payload_allocations > 0) { + lf_print_warning("Memory allocated for messages has not been freed."); + lf_print_warning("Number of unfreed messages: %d.", _lf_count_payload_allocations); + } + if (_lf_count_token_allocations > 0) { + lf_print_warning("Memory allocated for tokens has not been freed!"); + lf_print_warning("Number of unfreed tokens: %d.", _lf_count_token_allocations); + } #endif #if !defined(LF_SINGLE_THREADED) - for (int i = 0; i < env->watchdogs_size; i++) { - if (env->watchdogs[i]->base->reactor_mutex != NULL) { - free(env->watchdogs[i]->base->reactor_mutex); + for (int i = 0; i < env->watchdogs_size; i++) { + if (env->watchdogs[i]->base->reactor_mutex != NULL) { + free(env->watchdogs[i]->base->reactor_mutex); + } } - } #endif - lf_free_all_reactors(); + lf_free_all_reactors(); - // Free up memory associated with environment. - // Do this last so that printed warnings don't access freed memory. - for (int i = 0; i < num_envs; i++) { - environment_free(&env[i]); - } + // Free up memory associated with environment. + // Do this last so that printed warnings don't access freed memory. + for (int i = 0; i < num_envs; i++) { + environment_free(&env[i]); + } #if defined LF_ENCLAVES - free_local_rti(); + free_local_rti(); #endif + } + lf_tracing_global_shutdown(); } - lf_tracing_global_shutdown(); -} -index_t lf_combine_deadline_and_level(interval_t deadline, int level) { - if (deadline > (interval_t)(ULLONG_MAX >> 16)) - return ((ULLONG_MAX >> 16) << 16) | level; - else - return (deadline << 16) | level; -} + index_t lf_combine_deadline_and_level(interval_t deadline, int level) { + if (deadline > (interval_t)(ULLONG_MAX >> 16)) + return ((ULLONG_MAX >> 16) << 16) | level; + else + return (deadline << 16) | level; + } diff --git a/core/tag.c b/core/tag.c index 9bb35933f..ccca19949 100644 --- a/core/tag.c +++ b/core/tag.c @@ -70,6 +70,22 @@ int lf_tag_compare(tag_t tag1, tag_t tag2) { } } +tag_t lf_tag_max(tag_t tag1, tag_t tag2) { + if (lf_tag_compare(tag1, tag2) < 0) { + return tag2; + } else { + return tag1; + } +} + +tag_t lf_tag_min(tag_t tag1, tag_t tag2) { + if (lf_tag_compare(tag1, tag2) < 0) { + return tag1; + } else { + return tag2; + } +} + tag_t lf_delay_tag(tag_t tag, interval_t interval) { if (tag.time == NEVER || interval < 0LL) return tag; diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 57f888fc2..b2e64f5d5 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -194,6 +194,9 @@ void lf_set_present(lf_port_base_t* port) { } } +// Forward declaration. See federate.h +void synchronize_with_other_federates(environment_t* env); + /** * Wait until physical time matches or exceeds the specified logical time, * unless -fast is given. For decentralized coordination, this function will diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 8b25206ff..c4e40568f 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master \ No newline at end of file +c-enclaves diff --git a/tag/api/tag.h b/tag/api/tag.h index c903aaf53..aa0fe3554 100644 --- a/tag/api/tag.h +++ b/tag/api/tag.h @@ -116,6 +116,23 @@ tag_t lf_tag_add(tag_t a, tag_t b); */ int lf_tag_compare(tag_t tag1, tag_t tag2); +/** + * @brief Return the lesser out of two tags + * + * @param tag1 + * @param tag2 + * @return tag_t + */ +tag_t lf_tag_min(tag_t tag1, tag_t tag2); + +/** + * @brief Return the greater out of two tags + * + * @param tag1 + * @param tag2 + * @return tag_t + */ +tag_t lf_tag_max(tag_t tag1, tag_t tag2); /** * Delay a tag by the specified time interval to realize the "after" keyword. * Any interval less than 0 (including NEVER) is interpreted as "no delay",