Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream improvments #297

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions lib/grammers-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ serde = ["grammers-tl-types/impl-serde"]
[dependencies]
chrono = "0.4.38"
futures = "0.3.31"
futures-util = { version = "0.3.30", default-features = false, features = [
"alloc"
] }
grammers-crypto = { path = "../grammers-crypto", version = "0.7.0" }
grammers-mtproto = { path = "../grammers-mtproto", version = "0.7.0" }
grammers-mtsender = { path = "../grammers-mtsender", version = "0.7.0" }
Expand Down
4 changes: 0 additions & 4 deletions lib/grammers-client/DEPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ without having to use `Box`.

Provides Stream functionality

## futures-util

Provides useful functions for working with futures/tasks.

## url

Used to parse certain URLs to offer features such as joining private chats via their invite link.
14 changes: 9 additions & 5 deletions lib/grammers-client/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@
//! cargo run --example echo -- BOT_TOKEN
//! ```

use futures::StreamExt;
use futures_util::future::{select, Either};
use grammers_client::session::Session;
use grammers_client::{Client, Config, InitParams, Update};
use simple_logger::SimpleLogger;
use std::env;
use std::pin::pin;

use futures::{
future::{select, Either},
StreamExt,
};
use simple_logger::SimpleLogger;
use tokio::{runtime, task};

use grammers_client::session::Session;
use grammers_client::{Client, Config, InitParams, Update};

type Result = std::result::Result<(), Box<dyn std::error::Error>>;

const SESSION_FILE: &str = "echo.session";
Expand Down
14 changes: 9 additions & 5 deletions lib/grammers-client/examples/inline-pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
//! how much data a button's payload can contain, and to keep it simple, we're storing it inline
//! in decimal, so the numbers can't get too large).

use futures::StreamExt;
use futures_util::future::{select, Either};
use grammers_client::session::Session;
use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update};
use simple_logger::SimpleLogger;
use std::env;
use std::pin::pin;

use futures::{
future::{select, Either},
StreamExt,
};
use simple_logger::SimpleLogger;
use tokio::{runtime, task};

use grammers_client::session::Session;
use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update};

type Result = std::result::Result<(), Box<dyn std::error::Error>>;

const SESSION_FILE: &str = "inline-pagination.session";
Expand Down
22 changes: 10 additions & 12 deletions lib/grammers-client/src/client/chats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ use grammers_session::{PackedChat, PackedType};
use grammers_tl_types as tl;

use super::Client;
use crate::types::{
chats::AdminRightsBuilderInner, chats::BannedRightsBuilderInner, AdminRightsBuilder,
BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant, Photo, User,
use crate::{
types::{
chats::{AdminRightsBuilderInner, BannedRightsBuilderInner},
AdminRightsBuilder, BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant,
Photo, User,
},
utils::poll_future_ready,
};
pub use grammers_mtsender::{AuthorizationError, InvocationError};

Expand Down Expand Up @@ -210,9 +214,7 @@ impl Stream for ParticipantStream {
Self::Empty => {}
Self::Chat { buffer, .. } => {
if buffer.is_empty() {
let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) {
return Poll::Ready(Some(Err(e)));
}
}
Expand All @@ -225,9 +227,7 @@ impl Stream for ParticipantStream {
}
}

let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down Expand Up @@ -341,9 +341,7 @@ impl Stream for ProfilePhotoStream {
}
}

let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down
5 changes: 2 additions & 3 deletions lib/grammers-client/src/client/dialogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use grammers_session::PackedChat;
use grammers_tl_types as tl;

use crate::types::{ChatMap, Dialog, IterBuffer, Message};
use crate::utils::poll_future_ready;
use crate::Client;

const MAX_LIMIT: usize = 100;
Expand Down Expand Up @@ -79,9 +80,7 @@ impl Stream for DialogStream {

let result = {
self.request.limit = self.determine_limit(MAX_LIMIT);
let this = self.client.invoke(&self.request);
futures::pin_mut!(this);
futures::ready!(this.poll(cx))
poll_future_ready!(cx, self.client.invoke(&self.request))
}?;

let (dialogs, messages, users, chats) = match result {
Expand Down
18 changes: 7 additions & 11 deletions lib/grammers-client/src/client/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use std::future::Future;
use std::task::Poll;
use std::{io::SeekFrom, path::Path, sync::Arc};

use futures::{Stream, TryStreamExt};
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures::{
stream::{FuturesUnordered, StreamExt},
Stream, TryStreamExt,
};
use tokio::sync::mpsc::unbounded_channel;
use tokio::{
fs,
Expand All @@ -22,7 +24,7 @@ use grammers_mtsender::InvocationError;
use grammers_tl_types as tl;

use crate::types::{photo_sizes::PhotoSize, Downloadable, Media, Uploaded};
use crate::utils::generate_random_id;
use crate::utils::{generate_random_id, poll_future_ready};
use crate::Client;

pub const MIN_CHUNK_SIZE: i32 = 4 * 1024;
Expand Down Expand Up @@ -139,15 +141,9 @@ impl Stream for DownloadStream {
loop {
let result = match self.dc.take() {
Some(dc) => {
let this = self.client.invoke_in_dc(&self.request, dc as i32);
futures::pin_mut!(this);
futures::ready!(this.poll(cx))
}
None => {
let this = self.client.invoke(&self.request);
futures::pin_mut!(this);
futures::ready!(this.poll(cx))
poll_future_ready!(cx, self.client.invoke_in_dc(&self.request, dc as i32))
}
None => poll_future_ready!(cx, self.client.invoke(&self.request)),
};

break match result {
Expand Down
14 changes: 4 additions & 10 deletions lib/grammers-client/src/client/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! Methods related to sending messages.
use crate::types::message::EMPTY_MESSAGE;
use crate::types::{InputReactions, IterBuffer, Message};
use crate::utils::{generate_random_id, generate_random_ids};
use crate::utils::{generate_random_id, generate_random_ids, poll_future_ready};
use crate::{types, ChatMap, Client, InputMedia};
use chrono::{DateTime, FixedOffset};
use futures::Stream;
Expand Down Expand Up @@ -249,9 +249,7 @@ impl Stream for MessageStream {
{
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down Expand Up @@ -390,9 +388,7 @@ impl Stream for SearchStream {
{
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down Expand Up @@ -479,9 +475,7 @@ impl Stream for GlobalSearchStream {
let offset_rate = {
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
match futures::ready!(this.poll(cx)) {
match poll_future_ready!(cx, self.fill_buffer(limit)) {
Ok(offset_rate) => offset_rate,
Err(e) => return Poll::Ready(Some(Err(e))),
}
Expand Down
Loading
Loading