-
Here is an implementation of the Sink trait for the broadcast sender: use futures::task::{Context, Poll};
use futures::Sink;
use std::pin::Pin;
use tokio::sync::broadcast;
pub struct BroadcastSink<T> {
sender: broadcast::Sender<T>,
}
impl<T> BroadcastSink<T> {
pub fn new(sender: broadcast::Sender<T>) -> Self {
Self { sender }
}
}
impl<T> Sink<T> for BroadcastSink<T>
where
T: Clone + std::fmt::Debug,
{
type Error = broadcast::error::SendError<T>;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.sender.send(item)?;
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod test {
use futures::SinkExt;
use super::*;
#[tokio::test]
async fn forwards_to_sink() {
let (tx, mut rx1) = broadcast::channel(10);
let mut rx2 = tx.subscribe();
let mut sink = BroadcastSink::new(tx);
let receiver_handle_1 = tokio::spawn(async move {
let mut data = vec![];
while let Ok(msg) = rx1.recv().await {
data.push(msg);
}
data
});
let receiver_handle_2 = tokio::spawn(async move {
let mut data = vec![];
while let Ok(msg) = rx2.recv().await {
data.push(msg);
}
data
});
sink.send(42).await.unwrap();
sink.send(43).await.unwrap();
sink.send(44).await.unwrap();
drop(sink);
let (data_1, data_2) = tokio::join!(receiver_handle_1, receiver_handle_2);
assert_eq!(data_1.unwrap(), vec![42, 43, 44]);
assert_eq!(data_2.unwrap(), vec![42, 43, 44]);
}
} Are there problems with this? Would it make sense to have this mainline? |
Beta Was this translation helpful? Give feedback.
Answered by
Darksonn
Oct 22, 2024
Replies: 1 comment 1 reply
-
No. Tokio does not depend on the futures crate. |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It could potentially be in tokio-util, though.