Skip to content

Commit

Permalink
Add maxwait, start time coordination and system events (#206)
Browse files Browse the repository at this point in the history
* Handle unconnected inputs and add test

* Format

* Generate timeout handler for reactions

* Detect STP violations

* WIP

* Update grammer with never and forever keywords

* Various fixes for enabling STAA

* Rename STAA -> max-wait

* Support max-wait with no handler

* Update tests and examples

* FIx more tests

* Resolve merge issues

* max-wait -> maxwait

* Formatting

* Fix deadline code-gen

* Document intended_tag of event

* Document federated input connection

* Add federate name to the output

* Fixes

* Increase timeout to avoid flaky test

* Docs

* Formatting

* Fix RIOT test

* Fix logging

* FIx logging

* Remove mistakenly committed files

* Fix bug in connection generator

* Fix concurrency issues in TcpChannel

* Update FederatedConnection

* Format

* Fix typo in STP violation check

* Dont format kotlin code with spotless

* Format

* Add the StartupCoordinator skeleton

* WIP

* Formating

* Add proper handling of start tag

* Formatting

* Also format unit tests

* Small fixes

* Format

* Remove leader concept

* Add timeout to RIOT platform test

* Start moving to a system_event_queue

* Formatting

* Formating -> Formatting

* Port all examples to also support system_event_queue

* Formatting

* Dont call send_blocking from async context

* Remove was_ever_connected

* Formatting

* Make run script return on first error

* Update

* Fix critical section

* Update

* Fix LF_ENTRY_POINT_FEDERATE

* Fixes

* Format

* Formatting

* Cosmetics

* Docs
  • Loading branch information
erlingrj authored Feb 18, 2025
1 parent 6461a14 commit 8f6b03a
Show file tree
Hide file tree
Showing 106 changed files with 1,863 additions and 836 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ permissions:

jobs:
formating:
name: formating
name: formatting
runs-on: ubuntu-24.04
steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/riot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:

- name: Run platform tests
working-directory: ${{ github.workspace }}/test/platform/riot
run: ./runAll.sh
run: timeout 120 ./runAll.sh

- name: Build examples
working-directory: ${{ github.workspace }}/examples/riot
Expand Down
7 changes: 0 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ set(TEST_COVERAGE OFF CACHE BOOL "Compute test coverage")
set(ASAN OFF CACHE BOOL "Compile with AddressSanitizer")
set(PLATFORM "POSIX" CACHE STRING "Platform to target")
set(SCHEDULER "DYNAMIC" CACHE STRING "Scheduler to use")
set(EVENT_QUEUE_SIZE 32 CACHE STRING "Static size of the event queue")
set(REACTION_QUEUE_SIZE 32 CACHE STRING "Static size of the reaction queue")
set(NETWORK_CHANNEL_TCP_POSIX OFF CACHE BOOL "Use POSIX TCP NetworkChannel")

# Code coverage setup
Expand Down Expand Up @@ -83,11 +81,6 @@ target_compile_definitions(reactor-uc PUBLIC "PLATFORM_${PLATFORM}")
# Add compile definition for scheduler used
target_compile_definitions(reactor-uc PRIVATE "SCHEDULER_${SCHEDULER}")

# Add compile definitions for event and reaction queue sizes. Has to be PUBLIC because they are used in the header files.
message(STATUS "Setting event queue size to ${EVENT_QUEUE_SIZE} and reaction queue size to ${REACTION_QUEUE_SIZE}")
target_compile_definitions(reactor-uc PUBLIC EVENT_QUEUE_SIZE=${EVENT_QUEUE_SIZE})
target_compile_definitions(reactor-uc PUBLIC REACTION_QUEUE_SIZE=${REACTION_QUEUE_SIZE})

if(NETWORK_CHANNEL_TCP_POSIX)
target_compile_definitions(reactor-uc PRIVATE NETWORK_CHANNEL_TCP_POSIX)
endif()
Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,18 @@ asan:
make test -C build

# Format the code base
SRC_FILES := $(shell find ./src -path ./src/generated -prune -o -name '*.c' -print)
SRC_FILES := $(shell find ./src ./test/unit/ -path ./src/generated -prune -o -name '*.c' -print)
HDR_FILES := $(shell find ./include -path ./include/reactor-uc/generated -prune -o -name '*.h' -print)

format:
clang-format -i -style=file $(SRC_FILES) $(HDR_FILES)
cd lfc && ./gradlew ktfmtFormat && cd ..
cd lfc && ./gradlew spotlessApply
cd lfc && ./gradlew ktfmtFormat && ./gradlew spotlessApply && cd ..

# Check that the code base is formatted
format-check:
clang-format --dry-run --Werror -style=file $(SRC_FILES) $(HDR_FILES)
cd lfc && ./gradlew ktfmtCheck && cd ..
cd lfc && ./gradlew ktfmtCheck && ./gradlew spotlessCheck && cd ..

# Run the entire CI flow
ci: clean test coverage format-check
Expand Down
4 changes: 2 additions & 2 deletions examples/common/timer_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
LF_DEFINE_TIMER_STRUCT(TimerSource, t, 1, 0);
LF_DEFINE_TIMER_CTOR(TimerSource, t, 1, 0);
LF_DEFINE_REACTION_STRUCT(TimerSource, r, 0);
LF_DEFINE_REACTION_CTOR(TimerSource, r, 0);
LF_DEFINE_REACTION_CTOR(TimerSource, r, 0, NULL, NEVER, NULL);

typedef struct {
Reactor super;
Expand All @@ -21,4 +21,4 @@ LF_REACTOR_CTOR_SIGNATURE(TimerSource) {
LF_TIMER_REGISTER_EFFECT(self->t, self->r);
}

LF_ENTRY_POINT(TimerSource, SEC(1), false, false);
LF_ENTRY_POINT(TimerSource,32,32, SEC(1), false, false);
13 changes: 9 additions & 4 deletions examples/posix/federated/receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size
}

LF_DEFINE_REACTION_STRUCT(Receiver, r, 0);
LF_DEFINE_REACTION_CTOR(Receiver, r, 0);
LF_DEFINE_REACTION_CTOR(Receiver, r, 0, NULL, NEVER, NULL);
LF_DEFINE_INPUT_STRUCT(Receiver, in, 1, 0, msg_t, 0);
LF_DEFINE_INPUT_CTOR(Receiver, in, 1, 0, msg_t, 0);

Expand All @@ -36,7 +36,7 @@ LF_DEFINE_REACTION_BODY(Receiver, r) {
LF_SCOPE_SELF(Receiver);
LF_SCOPE_ENV();
LF_SCOPE_PORT(Receiver, in);
printf("Input triggered @ %" PRId64 " with %s size %d\n", env->get_elapsed_logical_time(env), in->value.msg,
printf("Input triggered @ " PRINTF_TIME " with %s size %d\n", env->get_elapsed_logical_time(env), in->value.msg,
in->value.size);
}

Expand All @@ -51,7 +51,7 @@ LF_REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Receiver, InputExternalCtorArgs *in_ex
}

LF_DEFINE_FEDERATED_INPUT_CONNECTION_STRUCT(Receiver, in, msg_t, 5);
LF_DEFINE_FEDERATED_INPUT_CONNECTION_CTOR(Receiver, in, msg_t, 5, MSEC(100), false);
LF_DEFINE_FEDERATED_INPUT_CONNECTION_CTOR(Receiver, in, msg_t, 5, MSEC(100), false, 0);

typedef struct {
FederatedConnectionBundle super;
Expand All @@ -67,12 +67,16 @@ LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
LF_INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_msg_t);
}

LF_DEFINE_STARTUP_COORDINATOR_STRUCT(Federate, 1);
LF_DEFINE_STARTUP_COORDINATOR_CTOR(Federate, 1, 1);

typedef struct {
Reactor super;
LF_CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
LF_FEDERATE_BOOKKEEPING_INSTANCES(1);
LF_CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
FederateStartupCoordinator startup_coordinator;
} MainRecv;

LF_REACTOR_CTOR_SIGNATURE(MainRecv) {
Expand All @@ -81,10 +85,11 @@ LF_REACTOR_CTOR_SIGNATURE(MainRecv) {
LF_DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
LF_INITIALIZE_STARTUP_COORDINATOR(Federate);
lf_connect_federated_input(&self->Receiver_Sender_bundle.inputs[0]->super, &self->receiver->in[0].super);
}

LF_ENTRY_POINT_FEDERATED(MainRecv, SEC(1), true, true, 1, false)
LF_ENTRY_POINT_FEDERATED(MainRecv,32,0,32, SEC(1), true, 1)

int main() {
lf_start();
Expand Down
15 changes: 10 additions & 5 deletions examples/posix/federated/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ typedef struct {
char msg[512];
} msg_t;

size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
int serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
const msg_t *msg = user_struct;

memcpy(msg_buf, &msg->size, sizeof(msg->size));
Expand All @@ -24,7 +24,7 @@ size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigne
LF_DEFINE_TIMER_STRUCT(Sender, t, 1, 0)
LF_DEFINE_TIMER_CTOR(Sender, t, 1, 0)
LF_DEFINE_REACTION_STRUCT(Sender, r, 1)
LF_DEFINE_REACTION_CTOR(Sender, r, 0)
LF_DEFINE_REACTION_CTOR(Sender, r, 0, NULL, NEVER, NULL)
LF_DEFINE_OUTPUT_STRUCT(Sender, out, 1, msg_t)
LF_DEFINE_OUTPUT_CTOR(Sender, out, 1)

Expand All @@ -41,7 +41,7 @@ LF_DEFINE_REACTION_BODY(Sender, r) {
LF_SCOPE_ENV();
LF_SCOPE_PORT(Sender, out);

printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env));
printf("Timer triggered @ " PRINTF_TIME "\n", env->get_elapsed_logical_time(env));
msg_t val;
strcpy(val.msg, "Hello From Sender");
val.size = sizeof("Hello From Sender");
Expand Down Expand Up @@ -78,6 +78,9 @@ LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver) {
LF_INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_msg_t);
}

LF_DEFINE_STARTUP_COORDINATOR_STRUCT(Federate, 1);
LF_DEFINE_STARTUP_COORDINATOR_CTOR(Federate, 1, 1);

// Reactor main
typedef struct {
Reactor super;
Expand All @@ -87,6 +90,7 @@ typedef struct {
LF_CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 1);
LF_CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
LF_CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0);
FederateStartupCoordinator startup_coordinator;
} MainSender;

LF_REACTOR_CTOR_SIGNATURE(MainSender) {
Expand All @@ -95,10 +99,11 @@ LF_REACTOR_CTOR_SIGNATURE(MainSender) {
LF_DEFINE_CHILD_OUTPUT_ARGS(sender, out, 1, 1);
LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender, 1, _sender_out_args[i]);
LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver);
lf_connect_federated_output(self->Sender_Receiver_bundle.outputs[0], self->sender->out);
lf_connect_federated_output(&self->Sender_Receiver_bundle.outputs[0]->super, &self->sender->out[0].super);
LF_INITIALIZE_STARTUP_COORDINATOR(Federate);
}

LF_ENTRY_POINT_FEDERATED(MainSender, SEC(1), true, false, 1, true)
LF_ENTRY_POINT_FEDERATED(MainSender,32,0,32, SEC(1), true, 1)

int main() {
lf_start();
Expand Down
2 changes: 1 addition & 1 deletion examples/posix/hello/hello.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
LF_DEFINE_REACTION_BODY(TimerSource, r) {
LF_SCOPE_SELF(TimerSource);
LF_SCOPE_ENV();
printf("TimerSource World @ %"PRId64"\n", env->get_elapsed_logical_time(env));
printf("TimerSource World @ " PRINTF_TIME "\n", env->get_elapsed_logical_time(env));
}

int main() {
Expand Down
2 changes: 0 additions & 2 deletions examples/riot/blinky/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,5 @@ QUIET ?= 1

# Enable reactor-uc features
# CFLAGS += -DNETWORK_CHANNEL_TCP_POSIX
REACTION_QUEUE_SIZE = 32
EVENT_QUEUE_SIZE = 32

include $(CURDIR)/../../../make/riot/riot.mk
2 changes: 0 additions & 2 deletions examples/riot/coap_federated/receiver/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ QUIET ?= 1

# Enable reactor-uc features
CFLAGS += -DNETWORK_CHANNEL_COAP
REACTION_QUEUE_SIZE = 32
EVENT_QUEUE_SIZE = 32

CFLAGS += -DTHREAD_STACKSIZE_DEFAULT=10000
CFLAGS += -DTHREAD_STACKSIZE_MAIN=10000
Expand Down
12 changes: 8 additions & 4 deletions examples/riot/coap_federated/receiver/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size
}

LF_DEFINE_REACTION_STRUCT(Receiver, r, 0)
LF_DEFINE_REACTION_CTOR(Receiver, r, 0)
LF_DEFINE_REACTION_CTOR(Receiver, r, 0, NULL, NEVER, NULL)
LF_DEFINE_INPUT_STRUCT(Receiver, in, 1, 0, lf_msg_t, 0)
LF_DEFINE_INPUT_CTOR(Receiver, in, 1, 0, lf_msg_t, 0)

Expand All @@ -35,7 +35,7 @@ LF_DEFINE_REACTION_BODY(Receiver, r) {
LF_SCOPE_SELF(Receiver);
LF_SCOPE_ENV();
LF_SCOPE_PORT(Receiver, in);
printf("Input triggered @ %" PRId64 " with %s size %d\n", env->get_elapsed_logical_time(env), in->value.msg,
printf("Input triggered @ " PRINTF_TIME " with %s size %d\n", env->get_elapsed_logical_time(env), in->value.msg,
in->value.size);
}

Expand All @@ -50,7 +50,7 @@ LF_REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Receiver, InputExternalCtorArgs *in_ex
}

LF_DEFINE_FEDERATED_INPUT_CONNECTION_STRUCT(Receiver, in, lf_msg_t, 5);
LF_DEFINE_FEDERATED_INPUT_CONNECTION_CTOR(Receiver, in, lf_msg_t, 5, MSEC(100), false);
LF_DEFINE_FEDERATED_INPUT_CONNECTION_CTOR(Receiver, in, lf_msg_t, 5, MSEC(100), false, 0);

typedef struct {
FederatedConnectionBundle super;
Expand All @@ -66,12 +66,15 @@ LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
LF_INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_msg_t);
}

LF_DEFINE_STARTUP_COORDINATOR_STRUCT(Federate, 1);
LF_DEFINE_STARTUP_COORDINATOR_CTOR(Federate, 1, 1);
typedef struct {
Reactor super;
LF_CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
LF_FEDERATE_BOOKKEEPING_INSTANCES(1);
LF_CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
FederateStartupCoordinator startup_coordinator;
} MainRecv;

LF_REACTOR_CTOR_SIGNATURE(MainRecv) {
Expand All @@ -80,10 +83,11 @@ LF_REACTOR_CTOR_SIGNATURE(MainRecv) {
LF_DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
LF_INITIALIZE_STARTUP_COORDINATOR(Federate);
lf_connect_federated_input(&self->Receiver_Sender_bundle.inputs[0]->super, &self->receiver->in[0].super);
}

LF_ENTRY_POINT_FEDERATED(MainRecv, SEC(1), true, true, 1, false)
LF_ENTRY_POINT_FEDERATED(MainRecv, 32, 0, 32, SEC(1), true, 1)

int main() {
lf_start();
Expand Down
2 changes: 0 additions & 2 deletions examples/riot/coap_federated/sender/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ QUIET ?= 1

# Enable reactor-uc features
CFLAGS += -DNETWORK_CHANNEL_COAP
REACTION_QUEUE_SIZE = 32
EVENT_QUEUE_SIZE = 32

CFLAGS += -DTHREAD_STACKSIZE_DEFAULT=10000
CFLAGS += -DTHREAD_STACKSIZE_MAIN=10000
Expand Down
10 changes: 7 additions & 3 deletions examples/riot/coap_federated/sender/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ int serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned c
LF_DEFINE_TIMER_STRUCT(Sender, t, 1, 0)
LF_DEFINE_TIMER_CTOR(Sender, t, 1, 0)
LF_DEFINE_REACTION_STRUCT(Sender, r, 1)
LF_DEFINE_REACTION_CTOR(Sender, r, 0)
LF_DEFINE_REACTION_CTOR(Sender, r, 0, NULL, NEVER, NULL)
LF_DEFINE_OUTPUT_STRUCT(Sender, out, 1, lf_msg_t)
LF_DEFINE_OUTPUT_CTOR(Sender, out, 1)

Expand All @@ -38,7 +38,7 @@ LF_DEFINE_REACTION_BODY(Sender, r) {
LF_SCOPE_ENV();
LF_SCOPE_PORT(Sender, out);

printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env));
printf("Timer triggered @ " PRINTF_TIME "\n", env->get_elapsed_logical_time(env));
lf_msg_t val;
strcpy(val.msg, "Hello From Sender");
val.size = sizeof("Hello From Sender");
Expand Down Expand Up @@ -73,6 +73,8 @@ LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver) {
LF_INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_msg_t);
}

LF_DEFINE_STARTUP_COORDINATOR_STRUCT(Federate, 1);
LF_DEFINE_STARTUP_COORDINATOR_CTOR(Federate, 1, 1);
// Reactor main
typedef struct {
Reactor super;
Expand All @@ -82,6 +84,7 @@ typedef struct {
LF_CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 1);
LF_CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
LF_CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0);
FederateStartupCoordinator startup_coordinator;
} MainSender;

LF_REACTOR_CTOR_SIGNATURE(MainSender) {
Expand All @@ -91,9 +94,10 @@ LF_REACTOR_CTOR_SIGNATURE(MainSender) {
LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender, 1, _sender_out_args[i]);
LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver);
lf_connect_federated_output((Connection *)self->Sender_Receiver_bundle.outputs[0], (Port *)self->sender->out);
LF_INITIALIZE_STARTUP_COORDINATOR(Federate);
}

LF_ENTRY_POINT_FEDERATED(MainSender, SEC(1), true, false, 1, true)
LF_ENTRY_POINT_FEDERATED(MainSender, SEC(1), true, 1)

int main() {
lf_start();
Expand Down
2 changes: 0 additions & 2 deletions examples/riot/hello/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,5 @@ QUIET ?= 1

# Enable reactor-uc features
# CFLAGS += -DNETWORK_CHANNEL_TCP_POSIX
REACTION_QUEUE_SIZE = 32
EVENT_QUEUE_SIZE = 32

include $(CURDIR)/../../../make/riot/riot.mk
12 changes: 8 additions & 4 deletions examples/zephyr/basic_federated/common/receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ typedef struct {
} msg_t;

LF_DEFINE_REACTION_STRUCT(Receiver, r, 0);
LF_DEFINE_REACTION_CTOR(Receiver, r, 0)
LF_DEFINE_REACTION_CTOR(Receiver, r, 0, NULL, NEVER, NULL);

LF_DEFINE_INPUT_STRUCT(Receiver, in, 1, 0, msg_t, 0)
LF_DEFINE_INPUT_CTOR(Receiver, in, 1, 0, msg_t, 0)
Expand All @@ -50,7 +50,7 @@ LF_DEFINE_REACTION_BODY(Receiver, r) {
LF_SCOPE_SELF(Receiver);
LF_SCOPE_ENV();
gpio_pin_toggle_dt(&led);
printf("Reaction triggered @ %" PRId64 " (%" PRId64 "), %" PRId64 ")\n", env->get_elapsed_logical_time(env),
printf("Reaction triggered @ " PRINTF_TIME " (" PRINTF_TIME "), " PRINTF_TIME ")\n", env->get_elapsed_logical_time(env),
env->get_logical_time(env), env->get_physical_time(env));
}

Expand All @@ -65,7 +65,7 @@ LF_REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Receiver, InputExternalCtorArgs *in_ex
}

LF_DEFINE_FEDERATED_INPUT_CONNECTION_STRUCT(Receiver, in, msg_t, 5);
LF_DEFINE_FEDERATED_INPUT_CONNECTION_CTOR(Receiver, in, msg_t, 5, MSEC(100), false);
LF_DEFINE_FEDERATED_INPUT_CONNECTION_CTOR(Receiver, in, msg_t, 5, MSEC(100), false, 0);

typedef struct {
FederatedConnectionBundle super;
Expand All @@ -81,12 +81,15 @@ LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
LF_INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_payload_default);
}

LF_DEFINE_STARTUP_COORDINATOR_STRUCT(Federate, 1);
LF_DEFINE_STARTUP_COORDINATOR_CTOR(Federate, 1, 1);
typedef struct {
Reactor super;
LF_CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
LF_FEDERATE_BOOKKEEPING_INSTANCES(1);
LF_CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
FederateStartupCoordinator startup_coordinator;
} MainRecv;

LF_REACTOR_CTOR_SIGNATURE(MainRecv) {
Expand All @@ -95,6 +98,7 @@ LF_REACTOR_CTOR_SIGNATURE(MainRecv) {
LF_DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
LF_INITIALIZE_STARTUP_COORDINATOR(Federate);
lf_connect_federated_input(&self->Receiver_Sender_bundle.inputs[0]->super, &self->receiver->in[0].super);
}
LF_ENTRY_POINT_FEDERATED(MainRecv, FOREVER, true, true, 1, false)
LF_ENTRY_POINT_FEDERATED(MainRecv,32,0,32, FOREVER, true, 1)
Loading

0 comments on commit 8f6b03a

Please sign in to comment.