Skip to content

Commit

Permalink
deploy: 57cb087
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Sep 10, 2023
1 parent 69fc060 commit 17cdf5e
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 9 deletions.
4 changes: 2 additions & 2 deletions ch03-00-actors.html
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ <h2 id="functional-actors"><a class="header" href="#functional-actors">Functiona
#[message(ret = u32)]
pub struct GetValue;

pub fn counter() -&gt; Schema {
pub fn counter() -&gt; Blueprint {
ActorGroup::new().exec(|mut ctx| async move {
// Private state of the actor.
let mut value = 0;
Expand Down Expand Up @@ -223,7 +223,7 @@ <h2 id="functional-actors"><a class="header" href="#functional-actors">Functiona
<pre><code class="language-rust ignore">use elfo::prelude::*;
use counter::{Increment, GetValue};

pub fn sample() -&gt; Schema {
pub fn sample() -&gt; Blueprint {
ActorGroup::new().exec(|ctx| async move {
// Increment the counter, we aren't interested in errors.
let _ = ctx.send(Increment { delta: 1 }).await;
Expand Down
167 changes: 166 additions & 1 deletion ch03-03-sources.html
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,172 @@ <h1 class="menu-title">The Actoromicon</h1>
<div id="content" class="content">
<main>
<h1 id="sources"><a class="header" href="#sources">Sources</a></h1>
<p>TODO</p>
<p>In the context of the <code>elfo</code> actor system, sources serve as conduits for integrating various streams of incoming messages, such as timers, signals, futures, and streams. They allow for a seamless amalgamation of these additional streams with the messages arriving in the mailbox. Consequently, features like tracing, telemetry, and dumps are uniformly available for both regular and source-generated messages.</p>
<p>You can instantiate sources using dedicated constructors that correspond to different types. It's important to note that initially, these sources are inactive; they only start generating messages once they are linked to the context through the method <code>ctx.attach(_)</code>. This method also returns a handler, facilitating the management of the source. Here is how you can utilize this method:</p>
<pre><code class="language-rust">let unattached_source = SomeSource::new();
let source_handle = ctx.attach(unattached_source);
</code></pre>
<p>If necessary, you can detach sources at any time by invoking the <code>handle.terminate()</code> method, as shown below:</p>
<pre><code class="language-rust">source_handle.terminate();
</code></pre>
<p>Under the hood, the storage and utilization of sources are optimized significantly to support multiple sources at the same time, thanks to the <a href="https://docs.rs/unicycle">unicycle</a> crate. While the system supports an unlimited number of sources without offering backpressure, it's essential to moderate the usage to prevent potential out-of-memory (OOM) errors.</p>
<h2 id="intervals"><a class="header" href="#intervals">Intervals</a></h2>
<p><a href="https://docs.rs/elfo/0.2.0-alpha.8/elfo/time/struct.Interval.html">The <code>Interval</code> source</a> is designed to generate messages at a defined time period.</p>
<p>To activate the source, employ either the <code>start(period)</code> or <code>start_after(delay, period)</code> methods, as shown below:</p>
<pre><code class="language-rust">use elfo::time::Interval;

#[message]
struct MyTick; // adhering to best practice by using the 'Tick' suffix

ctx.attach(Interval::new(MyTick))
.start(Duration::from_secs(42));

while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
MyTick =&gt; { /* handling code here */ },
});
}
</code></pre>
<h3 id="adjusting-the-period"><a class="header" href="#adjusting-the-period">Adjusting the period</a></h3>
<p>In instances where you need to adjust the timer's interval, possibly as a result of configuration changes, the <code>interval.set_period()</code> method comes in handy:</p>
<pre><code class="language-rust">use elfo::{time::Interval, messages::ConfigUpdated};

#[message]
struct MyTick; // adhering to best practice by using the 'Tick' suffix

let interval = ctx.attach(Interval::new(MyTick));
interval.start(ctx.config().period);

while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
ConfigUpdated =&gt; {
interval.set_period(ctx.config().period);
},
MyTick =&gt; { /* handling code here */ },
});
}
</code></pre>
<p>To halt the timer without detaching the interval, use <code>interval.stop()</code>. This method differs from <code>interval.terminate()</code> as it allows for the possibility to restart the timer later using <code>interval.start(period)</code> or <code>start_after(delay, period)</code> methods.</p>
<p>It's essential to note that calling <code>interval.start()</code> at different points can yield varied behavior compared to invoking <code>interval.set_period()</code> on an already active interval. The <code>interval.set_period()</code> method solely modifies the existing interval without resetting the time origin, contrasting with the rescheduling functions (<code>start_*</code> methods). Here's a visual representation to illustrate the differences between these two approaches:</p>
<pre><code>set_period(10s): | 5s | 5s | 5s | # 10s | 10s |
start(10s): | 5s | 5s | 5s | # 10s | 10s |
#
called here
</code></pre>
<h3 id="tracing"><a class="header" href="#tracing">Tracing</a></h3>
<p>Every message starts a new trace, thus a new <a href="./ch05-04-tracing.html#traceid"><code>TraceId</code></a> is generated and assigned to the current scope.</p>
<h2 id="delays"><a class="header" href="#delays">Delays</a></h2>
<p><a href="https://docs.rs/elfo/0.2.0-alpha.8/elfo/time/struct.Delay.html">The <code>Delay</code> source</a> is designed to generate one message after a specified time:</p>
<pre><code class="language-rust">use elfo::time::Delay;

#[message]
struct MyTick; // adhering to best practice by using the 'Tick' suffix

while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
SomeEvent =&gt; {
ctx.attach(Delay::new(ctx.config().delay, MyTick));
},
MyTick =&gt; { /* handling code here */ },
});
}
</code></pre>
<p>This source is detached automatically after emitting a message, there is no way to reschedule it. To stop delay before emitting, use the <code>delay.terminate()</code> method.</p>
<h3 id="tracing-1"><a class="header" href="#tracing-1">Tracing</a></h3>
<p>The emitted message continues the current trace. The reason for it is that this source is usually used for delaying specific action, so logically it's continues the current trace.</p>
<h2 id="signals"><a class="header" href="#signals">Signals</a></h2>
<p><a href="https://docs.rs/elfo/0.2.0-alpha.8/elfo/signal/struct.Signal.html">The <code>Signal</code> source</a> is designed to generate a message once a signal is received:</p>
<pre><code class="language-rust">use elfo::signal::{Signal, SignalKind};

#[message]
struct ReloadFile;

ctx.attach(Signal::new(SignalKind::UnixHangup, ReloadFile));

while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
ReloadFile =&gt; { /* handling code here */ },
});
}
</code></pre>
<p>It's based on the tokio implementation, so it should be useful to read
about <a href="https://docs.rs/tokio/latest/tokio/signal/unix/struct.Signal.html">caveats</a>.</p>
<h3 id="tracing-2"><a class="header" href="#tracing-2">Tracing</a></h3>
<p>Every message starts a new trace, thus a new trace id is generated and assigned to the current scope.</p>
<h2 id="streams"><a class="header" href="#streams">Streams</a></h2>
<p><a href="https://docs.rs/elfo/0.2.0-alpha.8/elfo/stream/struct.Stream.html">The <code>Stream</code> source</a> is designed to wrap existing futures/streams of messages. Items can be either any instance of <a href="https://docs.rs/elfo/0.2.0-alpha.8/elfo/trait.Message.html"><code>Message</code></a> or <code>Result&lt;impl Message, impl Message&gt;</code>.</p>
<p>Once stream is exhausted, it's detached automatically.</p>
<h3 id="futures"><a class="header" href="#futures">Futures</a></h3>
<p>Utilize <code>Stream::once()</code> when implementing subtasks such as initiating a background request:</p>
<pre><code class="language-rust">use elfo::stream::Stream;

#[message]
struct DataFetched(u32);

#[message]
struct FetchDataFailed(String);

async fn fetch_data() -&gt; Result&lt;DataFetched, FetchDataFailed&gt; {
// ... implementation details ...
}

while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
SomeEvent =&gt; {
ctx.attach(Stream::once(fetch_data()));
},
DataFetched =&gt; { /* handling code here */ },
FetchDataFailed =&gt; { /* error handling code here */ },
});
}
</code></pre>
<h3 id="futuresstream"><a class="header" href="#futuresstream">futures::Stream</a></h3>
<p><code>Stream::from_futures03</code> is used to wrap existing <code>futures::Stream</code>:</p>
<pre><code class="language-rust">use elfo::stream::Stream;

#[message]
struct MyItem(u32);

let stream = futures::stream::iter(vec![MyItem(0), MyItem(1)]);
ctx.attach(Stream::from_futures03(stream));

while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
MyItem =&gt; { /* handling code here */ },
});
}
</code></pre>
<p>To produce messages of different types from the stream, it's possible to cast specific messages into <code>AnyMessage</code> (undocumented for now):</p>
<pre><code class="language-rust">futures::stream::iter(vec![MyItem(0).upcast(), AnotherItem.upcast()])
</code></pre>
<h3 id="generators"><a class="header" href="#generators">Generators</a></h3>
<p><code>Stream::generate</code> is an alternative to the <a href="https://docs.rs/async-stream">async-stream</a> crate, offering the same functionality without the need for macros, thereby being formatted by rustfmt:</p>
<pre><code class="language-rust">use elfo::stream::Stream;

#[message]
struct SomeMessage(u32);

#[message]
struct AnotherMessage;

ctx.attach(Stream::generate(|mut e| async move {
e.emit(SomeMessage(42)).await;
e.emit(AnotherMessage).await;
}));

while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
SomeMessage(no) | AnotherMessage =&gt; { /* handling code here */ },
});
}
</code></pre>
<h3 id="tracing-3"><a class="header" href="#tracing-3">Tracing</a></h3>
<p>The trace handling varies depending upon the method used to create the stream:</p>
<ul>
<li>For <code>Stream::from_futures03()</code>: each message initiates a new trace.</li>
<li>For <code>Stream::once()</code> and <code>Stream::generate()</code>: the existing trace is continued.</li>
</ul>
<p>To override the current trace, leverage <code>scope::set_trace_id()</code> at any time.</p>

</main>

Expand Down
Loading

0 comments on commit 17cdf5e

Please sign in to comment.