Skip to content

Commit

Permalink
fix: event bus delete multiple disconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
blckngm committed Aug 1, 2023
1 parent 0a46331 commit cf4d32e
Showing 1 changed file with 32 additions and 19 deletions.
51 changes: 32 additions & 19 deletions crates/relayer/src/event/bus.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use alloc::collections::VecDeque;
use alloc::vec::Vec;

use crossbeam_channel as channel;

pub struct EventBus<T> {
txs: VecDeque<channel::Sender<T>>,
txs: Vec<channel::Sender<T>>,
}

impl<T> Default for EventBus<T> {
Expand All @@ -14,34 +14,22 @@ impl<T> Default for EventBus<T> {

impl<T> EventBus<T> {
pub fn new() -> Self {
Self {
txs: VecDeque::new(),
}
Self { txs: Vec::new() }
}

pub fn subscribe(&mut self) -> channel::Receiver<T> {
let (tx, rx) = channel::unbounded();
self.txs.push_back(tx);
self.txs.push(tx);
rx
}

pub fn broadcast(&mut self, value: T)
where
T: Clone,
{
let mut disconnected = Vec::new();

for (idx, tx) in self.txs.iter().enumerate() {
// TODO: Avoid cloning when sending to last subscriber
if let Err(channel::SendError(_)) = tx.send(value.clone()) {
disconnected.push(idx);
}
}

// Remove all disconnected subscribers
for idx in disconnected {
self.txs.remove(idx);
}
// Send to all txs. Remove disconnected.
self.txs
.retain(|tx| !matches!(tx.send(value.clone()), Err(channel::SendError(_))));
}
}

Expand Down Expand Up @@ -117,4 +105,29 @@ mod tests {

assert_eq!(counter(), 20);
}

#[test]
fn multi_disconnected() {
let mut bus = EventBus::new();

let n = 10;
let mut rxs = vec![];

for _ in 0..n {
rxs.push(bus.subscribe());
}

rxs.remove(0);
rxs.remove(0);

bus.broadcast(42);
for rx in &rxs {
assert_eq!(rx.recv(), Ok(42));
}

bus.broadcast(43);
for rx in &rxs {
assert_eq!(rx.recv(), Ok(43));
}
}
}

0 comments on commit cf4d32e

Please sign in to comment.