diff --git a/lib/grammers-client/Cargo.toml b/lib/grammers-client/Cargo.toml index 55e89175..b1459306 100644 --- a/lib/grammers-client/Cargo.toml +++ b/lib/grammers-client/Cargo.toml @@ -23,9 +23,6 @@ serde = ["grammers-tl-types/impl-serde"] [dependencies] chrono = "0.4.38" futures = "0.3.31" -futures-util = { version = "0.3.30", default-features = false, features = [ - "alloc" -] } grammers-crypto = { path = "../grammers-crypto", version = "0.7.0" } grammers-mtproto = { path = "../grammers-mtproto", version = "0.7.0" } grammers-mtsender = { path = "../grammers-mtsender", version = "0.7.0" } diff --git a/lib/grammers-client/DEPS.md b/lib/grammers-client/DEPS.md index c6167737..a57e82fe 100644 --- a/lib/grammers-client/DEPS.md +++ b/lib/grammers-client/DEPS.md @@ -80,10 +80,6 @@ without having to use `Box`. Provides Stream functionality -## futures-util - -Provides useful functions for working with futures/tasks. - ## url Used to parse certain URLs to offer features such as joining private chats via their invite link. diff --git a/lib/grammers-client/examples/echo.rs b/lib/grammers-client/examples/echo.rs index aa634c4d..a42760c6 100644 --- a/lib/grammers-client/examples/echo.rs +++ b/lib/grammers-client/examples/echo.rs @@ -10,15 +10,19 @@ //! cargo run --example echo -- BOT_TOKEN //! ``` -use futures::StreamExt; -use futures_util::future::{select, Either}; -use grammers_client::session::Session; -use grammers_client::{Client, Config, InitParams, Update}; -use simple_logger::SimpleLogger; use std::env; use std::pin::pin; + +use futures::{ + future::{select, Either}, + StreamExt, +}; +use simple_logger::SimpleLogger; use tokio::{runtime, task}; +use grammers_client::session::Session; +use grammers_client::{Client, Config, InitParams, Update}; + type Result = std::result::Result<(), Box>; const SESSION_FILE: &str = "echo.session"; diff --git a/lib/grammers-client/examples/inline-pagination.rs b/lib/grammers-client/examples/inline-pagination.rs index 3003fd71..8afbd41b 100644 --- a/lib/grammers-client/examples/inline-pagination.rs +++ b/lib/grammers-client/examples/inline-pagination.rs @@ -22,15 +22,19 @@ //! how much data a button's payload can contain, and to keep it simple, we're storing it inline //! in decimal, so the numbers can't get too large). -use futures::StreamExt; -use futures_util::future::{select, Either}; -use grammers_client::session::Session; -use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update}; -use simple_logger::SimpleLogger; use std::env; use std::pin::pin; + +use futures::{ + future::{select, Either}, + StreamExt, +}; +use simple_logger::SimpleLogger; use tokio::{runtime, task}; +use grammers_client::session::Session; +use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update}; + type Result = std::result::Result<(), Box>; const SESSION_FILE: &str = "inline-pagination.session"; diff --git a/lib/grammers-client/src/client/chats.rs b/lib/grammers-client/src/client/chats.rs index df779d6d..02bd1616 100644 --- a/lib/grammers-client/src/client/chats.rs +++ b/lib/grammers-client/src/client/chats.rs @@ -23,9 +23,13 @@ use grammers_session::{PackedChat, PackedType}; use grammers_tl_types as tl; use super::Client; -use crate::types::{ - chats::AdminRightsBuilderInner, chats::BannedRightsBuilderInner, AdminRightsBuilder, - BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant, Photo, User, +use crate::{ + types::{ + chats::{AdminRightsBuilderInner, BannedRightsBuilderInner}, + AdminRightsBuilder, BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant, + Photo, User, + }, + utils::poll_future_ready, }; pub use grammers_mtsender::{AuthorizationError, InvocationError}; @@ -210,9 +214,7 @@ impl Stream for ParticipantStream { Self::Empty => {} Self::Chat { buffer, .. } => { if buffer.is_empty() { - let this = self.fill_buffer(); - futures::pin_mut!(this); - if let Err(e) = futures::ready!(this.poll(cx)) { + if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) { return Poll::Ready(Some(Err(e))); } } @@ -225,9 +227,7 @@ impl Stream for ParticipantStream { } } - let this = self.fill_buffer(); - futures::pin_mut!(this); - if let Err(e) = futures::ready!(this.poll(cx)) { + if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) { return Poll::Ready(Some(Err(e))); } } @@ -341,9 +341,7 @@ impl Stream for ProfilePhotoStream { } } - let this = self.fill_buffer(); - futures::pin_mut!(this); - if let Err(e) = futures::ready!(this.poll(cx)) { + if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) { return Poll::Ready(Some(Err(e))); } } diff --git a/lib/grammers-client/src/client/dialogs.rs b/lib/grammers-client/src/client/dialogs.rs index 68707059..87d07073 100644 --- a/lib/grammers-client/src/client/dialogs.rs +++ b/lib/grammers-client/src/client/dialogs.rs @@ -16,6 +16,7 @@ use grammers_session::PackedChat; use grammers_tl_types as tl; use crate::types::{ChatMap, Dialog, IterBuffer, Message}; +use crate::utils::poll_future_ready; use crate::Client; const MAX_LIMIT: usize = 100; @@ -79,9 +80,7 @@ impl Stream for DialogStream { let result = { self.request.limit = self.determine_limit(MAX_LIMIT); - let this = self.client.invoke(&self.request); - futures::pin_mut!(this); - futures::ready!(this.poll(cx)) + poll_future_ready!(cx, self.client.invoke(&self.request)) }?; let (dialogs, messages, users, chats) = match result { diff --git a/lib/grammers-client/src/client/files.rs b/lib/grammers-client/src/client/files.rs index 81afe20c..e423c48f 100644 --- a/lib/grammers-client/src/client/files.rs +++ b/lib/grammers-client/src/client/files.rs @@ -9,8 +9,10 @@ use std::future::Future; use std::task::Poll; use std::{io::SeekFrom, path::Path, sync::Arc}; -use futures::{Stream, TryStreamExt}; -use futures_util::stream::{FuturesUnordered, StreamExt}; +use futures::{ + stream::{FuturesUnordered, StreamExt}, + Stream, TryStreamExt, +}; use tokio::sync::mpsc::unbounded_channel; use tokio::{ fs, @@ -22,7 +24,7 @@ use grammers_mtsender::InvocationError; use grammers_tl_types as tl; use crate::types::{photo_sizes::PhotoSize, Downloadable, Media, Uploaded}; -use crate::utils::generate_random_id; +use crate::utils::{generate_random_id, poll_future_ready}; use crate::Client; pub const MIN_CHUNK_SIZE: i32 = 4 * 1024; @@ -139,15 +141,9 @@ impl Stream for DownloadStream { loop { let result = match self.dc.take() { Some(dc) => { - let this = self.client.invoke_in_dc(&self.request, dc as i32); - futures::pin_mut!(this); - futures::ready!(this.poll(cx)) - } - None => { - let this = self.client.invoke(&self.request); - futures::pin_mut!(this); - futures::ready!(this.poll(cx)) + poll_future_ready!(cx, self.client.invoke_in_dc(&self.request, dc as i32)) } + None => poll_future_ready!(cx, self.client.invoke(&self.request)), }; break match result { diff --git a/lib/grammers-client/src/client/messages.rs b/lib/grammers-client/src/client/messages.rs index af9e80d0..2057c34b 100644 --- a/lib/grammers-client/src/client/messages.rs +++ b/lib/grammers-client/src/client/messages.rs @@ -9,7 +9,7 @@ //! Methods related to sending messages. use crate::types::message::EMPTY_MESSAGE; use crate::types::{InputReactions, IterBuffer, Message}; -use crate::utils::{generate_random_id, generate_random_ids}; +use crate::utils::{generate_random_id, generate_random_ids, poll_future_ready}; use crate::{types, ChatMap, Client, InputMedia}; use chrono::{DateTime, FixedOffset}; use futures::Stream; @@ -249,9 +249,7 @@ impl Stream for MessageStream { { self.request.limit = self.determine_limit(MAX_LIMIT); let limit = self.request.limit; - let this = self.fill_buffer(limit); - futures::pin_mut!(this); - if let Err(e) = futures::ready!(this.poll(cx)) { + if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) { return Poll::Ready(Some(Err(e))); } } @@ -390,9 +388,7 @@ impl Stream for SearchStream { { self.request.limit = self.determine_limit(MAX_LIMIT); let limit = self.request.limit; - let this = self.fill_buffer(limit); - futures::pin_mut!(this); - if let Err(e) = futures::ready!(this.poll(cx)) { + if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) { return Poll::Ready(Some(Err(e))); } } @@ -479,9 +475,7 @@ impl Stream for GlobalSearchStream { let offset_rate = { self.request.limit = self.determine_limit(MAX_LIMIT); let limit = self.request.limit; - let this = self.fill_buffer(limit); - futures::pin_mut!(this); - match futures::ready!(this.poll(cx)) { + match poll_future_ready!(cx, self.fill_buffer(limit)) { Ok(offset_rate) => offset_rate, Err(e) => return Poll::Ready(Some(Err(e))), } diff --git a/lib/grammers-client/src/client/updates.rs b/lib/grammers-client/src/client/updates.rs index 92745248..531f0f21 100644 --- a/lib/grammers-client/src/client/updates.rs +++ b/lib/grammers-client/src/client/updates.rs @@ -8,49 +8,83 @@ //! Methods to deal with and offer access to updates. -use super::Client; -use crate::types::{ChatMap, Update}; -use futures::stream::FusedStream; -use futures::Stream; -use futures_util::future::{select, Either}; -pub use grammers_mtsender::{AuthorizationError, InvocationError}; -use grammers_session::channel_id; -pub use grammers_session::{PrematureEndReason, UpdateState}; -use grammers_tl_types as tl; use std::future::Future; use std::pin::pin; use std::sync::Arc; use std::task::Poll; use std::time::{Duration, Instant}; + +use futures::{ + future::{select, Either}, + stream::FusedStream, + Stream, +}; use tokio::time::sleep_until; +pub use grammers_mtsender::{AuthorizationError, InvocationError}; +use grammers_session::channel_id; +pub use grammers_session::{PrematureEndReason, UpdateState}; +use grammers_tl_types as tl; + +use super::Client; +use crate::types::{ChatMap, Update}; +use crate::utils::{poll_future, poll_future_ready}; + /// How long to wait after warning the user that the updates limit was exceeded. const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300); impl Client { - /// Returns the next update from the buffer where they are queued until used. + /// Returns a stream over raw updates. + /// + /// # Example + /// + /// ``` + /// # async fn f(client: grammers_client::Client) -> Result<(), Box> { + /// use futures::TryStreamExt; + /// use grammers_client::Update; + /// + /// client + /// .raw_update_stream() + /// .try_for_each(|(update, _)| { + /// // Print all incoming updates in their raw form + /// dbg!(update); + /// futures::future::ready(Ok(())) + /// }) + /// .await; + /// # Ok(()) + /// # } + /// ``` + pub fn raw_update_stream(&self) -> RawUpdateStream<'_> { + RawUpdateStream { client: self } + } + + /// Returns a stream over updates. /// /// # Example /// /// ``` /// # async fn f(client: grammers_client::Client) -> Result<(), Box> { + /// use futures::TryStreamExt; /// use grammers_client::Update; /// - /// loop { - /// let update = client.next_update().await?; - /// // Echo incoming messages and ignore everything else - /// match update { - /// Update::NewMessage(mut message) if !message.outgoing() => { - /// message.respond(message.text()).await?; + /// client + /// .update_stream() + /// .try_for_each_concurrent(None, |update| async { + /// match update { + /// Update::NewMessage(message) if !message.outgoing() => { + /// message.respond(message.text()).await.map(|_| ()) + /// } + /// _ => Ok(()), /// } - /// _ => {} - /// } - /// } + /// }) + /// .await?; /// # Ok(()) /// # } /// ``` pub fn update_stream(&self) -> UpdateStream<'_> { - UpdateStream { client: self } + UpdateStream { + raw_stream: self.raw_update_stream(), + } } pub(crate) fn process_socket_updates(&self, all_updates: Vec) { @@ -132,33 +166,16 @@ impl Client { } } -pub struct UpdateStream<'a> { +pub struct RawUpdateStream<'a> { client: &'a Client, } -impl<'a> UpdateStream<'a> { +impl<'a> RawUpdateStream<'a> { /// Returns the next raw update and associated chat map from the buffer where they are queued until used. /// - /// # Example - /// - /// ``` - /// # async fn f(client: grammers_client::Client) -> Result<(), Box> { - /// loop { - /// let (update, chats) = client.next_raw_update().await?; - /// - /// // Print all incoming updates in their raw form - /// dbg!(update); - /// } - /// # Ok(()) - /// # } - /// - /// ``` - /// /// P.S. If you don't receive updateBotInlineSend, go to [@BotFather](https://t.me/BotFather), select your bot and click "Bot Settings", then "Inline Feedback" and select probability. /// - pub async fn next_raw_update( - &self, - ) -> Result<(tl::enums::Update, Arc), InvocationError> { + async fn next_raw_update(&self) -> Result<(tl::enums::Update, Arc), InvocationError> { loop { let (deadline, get_diff, get_channel_diff) = { let state = &mut *self.client.0.state.write().unwrap(); @@ -279,6 +296,37 @@ impl<'a> UpdateStream<'a> { } } +impl<'a> Stream for RawUpdateStream<'a> { + type Item = Result<(tl::enums::Update, Arc), InvocationError>; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + poll_future!(cx, self.next_raw_update()).map(Some) + } +} + +impl<'a> FusedStream for RawUpdateStream<'a> { + fn is_terminated(&self) -> bool { + // The update stream is a continuous flow of updates. + // As a long-running stream, it never reaches a + // terminated state, hence we always return false. + false + } +} + +pub struct UpdateStream<'a> { + raw_stream: RawUpdateStream<'a>, +} + +impl<'a> UpdateStream<'a> { + /// Consume the [`UpdateStream`] and return the underlying [`RawUpdateStream`]. + pub fn into_raw_update_stream(self) -> RawUpdateStream<'a> { + self.raw_stream + } +} + impl<'a> Stream for UpdateStream<'a> { type Item = Result; @@ -288,15 +336,13 @@ impl<'a> Stream for UpdateStream<'a> { ) -> Poll> { loop { let (update, chats) = { - let this = self.next_raw_update(); - futures::pin_mut!(this); - match futures::ready!(this.poll(cx)) { + match poll_future_ready!(cx, self.raw_stream.next_raw_update()) { Ok(update) => update, Err(e) => return Poll::Ready(Some(Err(e))), } }; - if let Some(update) = Update::new(self.client, update, &chats) { + if let Some(update) = Update::new(self.raw_stream.client, update, &chats) { return Poll::Ready(Some(Ok(update))); } } diff --git a/lib/grammers-client/src/lib.rs b/lib/grammers-client/src/lib.rs index a4a8b18a..fe22ae85 100644 --- a/lib/grammers-client/src/lib.rs +++ b/lib/grammers-client/src/lib.rs @@ -56,3 +56,6 @@ pub use grammers_mtproto::transport; pub use grammers_mtsender::{FixedReconnect, InvocationError, NoReconnect, ReconnectionPolicy}; pub use grammers_session as session; pub use grammers_tl_types; + +// re-export futures +pub use futures; diff --git a/lib/grammers-client/src/types/action.rs b/lib/grammers-client/src/types/action.rs index 11204fc8..e56de9cd 100644 --- a/lib/grammers-client/src/types/action.rs +++ b/lib/grammers-client/src/types/action.rs @@ -5,7 +5,7 @@ // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. -use futures_util::future::Either; +use futures::future::Either; use grammers_mtsender::InvocationError; use grammers_session::PackedChat; use grammers_tl_types as tl; @@ -123,7 +123,7 @@ impl ActionSender { tokio::pin!(action); - match futures_util::future::select(action, &mut future).await { + match futures::future::select(action, &mut future).await { Either::Left((_, _)) => continue, Either::Right((output, _)) => break output, } diff --git a/lib/grammers-client/src/utils.rs b/lib/grammers-client/src/utils.rs index a35082a3..5ac68552 100644 --- a/lib/grammers-client/src/utils.rs +++ b/lib/grammers-client/src/utils.rs @@ -103,3 +103,23 @@ pub(crate) fn always_find_entity( None => types::Chat::unpack(get_packed()), } } + +/// A helper macro to poll the future using the given context. +macro_rules! poll_future { + ($cx:expr, $($future:tt)*) => {{ + let this = $($future)*; + futures::pin_mut!(this); + this.poll($cx) + }}; +} + +/// A helper macro to poll the future using the given context. +/// return `Poll::Pending` early in case that future returned `Poll::Pending`. +macro_rules! poll_future_ready { + ($cx:expr, $($future:tt)*) => { + futures::ready!(crate::utils::poll_future!($cx, $($future)*)) + }; +} + +pub(crate) use poll_future; +pub(crate) use poll_future_ready;