Skip to content

Commit

Permalink
iox-#9 TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
elBoberido committed Aug 25, 2023
1 parent 921cda9 commit 958cc31
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 32 deletions.
31 changes: 31 additions & 0 deletions examples/publisher_for_event_based_subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

use iceoryx_rs::PublisherBuilder;
use iceoryx_rs::Runtime;

use std::error::Error;
use std::thread;
use std::time::Duration;

fn main() -> Result<(), Box<dyn Error>> {
Runtime::init("fibonacci");

let publisher = PublisherBuilder::<u64>::new("math", "sequence", "fibonacci").create()?;

let mut fib_current = 0u64;
let mut fib_next = 1u64;
loop {
let mut sample = publisher.loan()?;
*sample = fib_current;
publisher.publish(sample);

println!("Sending: {}", fib_current);
let fib_next_new = fib_current + fib_next;
fib_current = fib_next;
fib_next = fib_next_new;

thread::sleep(Duration::from_millis(1000));
}
}
50 changes: 41 additions & 9 deletions examples/subscriber_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use topic::Counter;

use iceoryx_rs::Runtime;
use iceoryx_rs::SubscriberBuilder;
use iceoryx_rs::SampleReceiver;
use iceoryx_rs::reactor::{self, Control, Condition, Dispatcher, Foo};

use std::error::Error;
use std::thread;
Expand All @@ -15,20 +17,50 @@ use std::time::Duration;
fn main() -> Result<(), Box<dyn Error>> {
Runtime::init("subscriber_simple");

let (subscriber, sample_receive_token) =
let (counter_subscriber, counter_sample_receive_token) =
SubscriberBuilder::<Counter>::new("Radar", "FrontLeft", "Counter")
.queue_capacity(5)
.create()?;

let sample_receiver = subscriber.get_sample_receiver(sample_receive_token);
let counter_sample_receiver = counter_subscriber.get_sample_receiver(counter_sample_receive_token);

loop {
if sample_receiver.has_data() {
while let Some(sample) = sample_receiver.take() {
println!("Receiving: {}", sample.counter);
let r = reactor::Reactor::new();
let (mut control, mut dispatcher) = r.split();
let mut token = control.attach(counter_sample_receiver, Box::new(|s| { Condition::State(s.has_data()) }), Box::new(|s|
{
while let Some(sample) = s.take() {
println!("Receiving counter: {}", sample.counter);
}
} else {
thread::sleep(Duration::from_millis(100));
}
})
);

dispatcher.add_handler(token.handler.take().unwrap());

let (fibonacci_subscriber, fibonacci_sample_receive_token) = SubscriberBuilder::<u64>::new("math", "sequence", "fibonacci").queue_capacity(5).create()?;
let fibonacci_sample_receiver = fibonacci_subscriber.get_sample_receiver(fibonacci_sample_receive_token);
let mut token = control.attach(fibonacci_sample_receiver, Box::new(|s| { Condition::State(s.has_data()) }), Box::new(|s|
{
while let Some(sample) = s.take() {
println!("Receiving fibonacci: {}", *sample);
}
})
);

dispatcher.add_handler(token.handler.take().unwrap());

while let Some(notification_index) = dispatcher.next_with_timeout(Duration::from_secs(2)) {
println!("notification index: {}", notification_index);
}

// loop {
// if sample_receiver.has_data() {
// while let Some(sample) = sample_receiver.take() {
// println!("Receiving: {}", sample.counter);
// }
// } else {
// thread::sleep(Duration::from_millis(100));
// }
// }

Ok(())
}
8 changes: 3 additions & 5 deletions iceoryx-sys/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,11 @@ impl Subscriber {
}
}

pub fn set_condition_variable(&self, condition_variable: &ConditionVariable) {
pub fn set_condition_variable(&self, condition_variable: &ConditionVariable, notification_index: u64) {
unsafe {
cpp!([self as "SubscriberPortUser*", condition_variable as "ConditionVariable*"] {
cpp!([self as "SubscriberPortUser*", condition_variable as "ConditionVariable*", notification_index as "uint64_t"] {
if(!self->isConditionVariableSet()) {
// currently the condition variable is used only for one subscriber and therefore the index is set to 0
constexpr uint64_t NOTIFICATION_INDEX{0};
self->setConditionVariable(condition_variable->data(), NOTIFICATION_INDEX);
self->setConditionVariable(condition_variable->data(), notification_index);
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-FileCopyrightText: © Contributors to the iceoryx-rs project
// SPDX-FileContributor: Mathias Kraus

#![warn(missing_docs)]
// #![warn(missing_docs)]

//! # iceoryx-rs
//!
Expand Down
41 changes: 37 additions & 4 deletions src/reactor/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,56 @@
use super::{Event, Handler, State};

use std::any::Any;
use std::marker::PhantomData;
use std::sync::Arc;

pub trait Foo {
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;

fn attach_condition_variable(&self, condition_variable: &ffi::ConditionVariable, notification_index: u64);
}

pub struct AttachToken <T> {
notification_index: u64,
pub handler: Option<Handler>, // TODO remove handler
_phantom: PhantomData<T>,
}

pub enum Condition {
Event(bool), // will be evaluated once when triggered, right before the handler is called
State(bool), // will be evaluated multiple times, right before and after the handler is called
}

pub struct Control {
condition_variable: Arc<Box<ffi::ConditionVariable>>,
notification_index: u64,
// TODO queue sender
}

impl Control {
pub(super) fn new(condition_variable: Arc<Box<ffi::ConditionVariable>>) -> Self {
Self { condition_variable }
Self { condition_variable, notification_index: 0 }
}

pub fn attach_event<T: Event>(source: Box<dyn Event>, handler: Box<Handler<T>>) {
unimplemented!()
pub fn attach<T: Foo + 'static>(&mut self, target: T, condition: Box<dyn Fn(&T) -> Condition>, mut action: Box<dyn FnMut(&mut T)>) -> AttachToken<T> {
let condition = move |t: &dyn Foo| {
unsafe { condition(t.as_any().downcast_ref::<T>().unwrap_unchecked()) }
};
let action = move |t: &mut dyn Foo| {
unsafe { action(t.as_any_mut().downcast_mut::<T>().unwrap_unchecked()); }
};

let notification_index = self.notification_index;
self.notification_index += 1;
target.attach_condition_variable(&self.condition_variable, notification_index);
let handler = Handler::new(Box::new(target), Box::new(condition), Box::new(action));

AttachToken {notification_index, handler: Some(handler), _phantom: PhantomData }
}

pub fn attach_state<T: State>(source: Box<dyn State>, handler: Box<Handler<T>>) {
pub fn detach<T: Foo>(&mut self, token: AttachToken<T>) -> T {
unimplemented!()
}
}
2 changes: 1 addition & 1 deletion src/reactor/demultiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::sync::Arc;

pub(super) struct Demultiplexer {
condition_variable: Arc<Box<ffi::ConditionVariable>>,
pub(super) condition_variable: Arc<Box<ffi::ConditionVariable>>,
// TODO queue receiver
}

Expand Down
58 changes: 58 additions & 0 deletions src/reactor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,76 @@
//! Dispatches the events according to the registered handler
use super::Demultiplexer;
use super::Condition;
use super::Handler;

use std::sync::Arc;
use std::time::{Duration, SystemTime};

pub struct Dispatcher {
demux: Demultiplexer,
targets: [Option<Handler>; 20],
number_of_handler: u64,
// TODO queue receiver
}

impl Dispatcher {
pub(super) fn new(condition_variable: Arc<Box<ffi::ConditionVariable>>) -> Self {
Self {
demux: Demultiplexer::new(condition_variable),
targets: Default::default(),
number_of_handler: 0,
}
}

// TODO remove
pub fn add_handler(&mut self, handler: Handler) {
self.targets[self.number_of_handler as usize] = Some(handler);
self.number_of_handler += 1;
}

pub fn next_with_timeout(&mut self, timeout: Duration) -> Option<u64> {
let entry_time = SystemTime::now();
while let Some(remaining_timeout) = {
let elapsed = entry_time.elapsed().unwrap_or(timeout);
timeout.checked_sub(elapsed)
} {
self.demux.condition_variable.timed_wait(remaining_timeout);
for index in 0..self.number_of_handler {
let handler = self.targets[index as usize].as_mut().unwrap();
if let Condition::State(true) = (handler.condition)(&*handler.target) {
(handler.action)(&mut *handler.target);
}
if index == self.number_of_handler -1 { return Some(0) }
}
}

None
}

// pub fn wait_for_samples(&self, timeout: Duration) -> SampleReceiverWaitState {
// if !self.ffi_sub.as_ref().is_condition_variable_set() {
// return SampleReceiverWaitState::Stopped;
// }
// if self.has_data() {
// return SampleReceiverWaitState::SamplesAvailable;
// }
//
// let entry_time = SystemTime::now();
// while let Some(remaining_timeout) = {
// let elapsed = entry_time.elapsed().unwrap_or(timeout);
// timeout.checked_sub(elapsed)
// } {
// // self.condition_variable.timed_wait(remaining_timeout);
// // if self.has_data() {
// // return SampleReceiverWaitState::SamplesAvailable;
// // }
// }
//
// if self.ffi_sub.as_ref().is_condition_variable_set() {
// SampleReceiverWaitState::Timeout
// } else {
// SampleReceiverWaitState::Stopped
// }
// }
}
19 changes: 16 additions & 3 deletions src/reactor/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,25 @@

//! Event handler dispatched by the reactor
use std::marker::PhantomData;
use super::control::Foo;
use super::control::Condition;

pub trait Event {}

pub trait State {}

pub struct Handler<T> {
_phantom: PhantomData<T>,
pub struct Handler {
pub(super) target: Box<dyn Foo>,
pub(super) condition: Box<dyn Fn(&dyn Foo) -> Condition>,
pub(super) action: Box<dyn FnMut(&mut dyn Foo)>,
}

impl Handler {
pub (crate) fn new(target: Box<dyn Foo>,
condition: Box<dyn Fn(&dyn Foo) -> Condition>,
action: Box<dyn FnMut(&mut dyn Foo)>) -> Self

{
Self {target, condition, action}
}
}
9 changes: 8 additions & 1 deletion src/reactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::sync::Arc;

mod control;
pub use control::Control;
pub use control::Condition;
pub use control::Foo;

mod demultiplexer;
use demultiplexer::Demultiplexer;
Expand All @@ -23,7 +25,7 @@ pub use handler::Event;
pub use handler::Handler;
pub use handler::State;

struct Reactor {
pub struct Reactor {
control: Control,
dispatcher: Dispatcher,
}
Expand All @@ -39,4 +41,9 @@ impl Reactor {
dispatcher,
}
}

pub fn split(self) -> (Control, Dispatcher) {
let Reactor { control, dispatcher } = self;
(control, dispatcher)
}
}
36 changes: 28 additions & 8 deletions src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,38 @@ impl<S: ffi::SubscriberStrongRef> Sample<[u8], S> {
/// Access to the sample receiver queue of the subscriber
pub struct SampleReceiver<T: ?Sized, S: ffi::SubscriberStrongRef> {
ffi_sub: S,
condition_variable: Box<ffi::ConditionVariable>,
// condition_variable: Box<ffi::ConditionVariable>,
phantom: PhantomData<T>,
}

use crate::reactor::Foo;

use std::any::Any;

impl<T: ?Sized + 'static, S: ffi::SubscriberStrongRef + 'static> Foo for SampleReceiver<T, S> {
fn as_any(&self) -> &dyn Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn Any {
self
}

fn attach_condition_variable(&self, condition_variable: &ffi::ConditionVariable, notification_index: u64) {
self.ffi_sub.as_ref().set_condition_variable(&condition_variable, notification_index);
}
}

impl<T: ?Sized, S: ffi::SubscriberStrongRef> SampleReceiver<T, S> {
pub(super) fn new(ffi_sub: S) -> Self {
let condition_variable = ffi::ConditionVariable::new();
ffi_sub.as_ref().set_condition_variable(&condition_variable);
// let condition_variable = ffi::ConditionVariable::new();
// // currently the condition variable is used only for one subscriber and therefore the index is set to 0
// let notification_index = 0;
// ffi_sub.as_ref().set_condition_variable(&condition_variable, notification_index);

SampleReceiver {
ffi_sub,
condition_variable,
// condition_variable,
phantom: PhantomData,
}
}
Expand Down Expand Up @@ -105,10 +125,10 @@ impl<T: ?Sized, S: ffi::SubscriberStrongRef> SampleReceiver<T, S> {
let elapsed = entry_time.elapsed().unwrap_or(timeout);
timeout.checked_sub(elapsed)
} {
self.condition_variable.timed_wait(remaining_timeout);
if self.has_data() {
return SampleReceiverWaitState::SamplesAvailable;
}
// self.condition_variable.timed_wait(remaining_timeout);
// if self.has_data() {
// return SampleReceiverWaitState::SamplesAvailable;
// }
}

if self.ffi_sub.as_ref().is_condition_variable_set() {
Expand Down

0 comments on commit 958cc31

Please sign in to comment.