Skip to content

Commit

Permalink
Revert "Cleanup stream handling and improve update streams (Lonami#297)"
Browse files Browse the repository at this point in the history
This reverts commit bb46ef0.
  • Loading branch information
YouKnow-sys committed Dec 21, 2024
1 parent bb46ef0 commit d5e40d1
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 156 deletions.
3 changes: 3 additions & 0 deletions lib/grammers-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ serde = ["grammers-tl-types/impl-serde"]
[dependencies]
chrono = "0.4.38"
futures = "0.3.31"
futures-util = { version = "0.3.30", default-features = false, features = [
"alloc"
] }
grammers-crypto = { path = "../grammers-crypto", version = "0.7.0" }
grammers-mtproto = { path = "../grammers-mtproto", version = "0.7.0" }
grammers-mtsender = { path = "../grammers-mtsender", version = "0.7.0" }
Expand Down
4 changes: 4 additions & 0 deletions lib/grammers-client/DEPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ without having to use `Box`.

Provides Stream functionality

## futures-util

Provides useful functions for working with futures/tasks.

## url

Used to parse certain URLs to offer features such as joining private chats via their invite link.
14 changes: 5 additions & 9 deletions lib/grammers-client/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,15 @@
//! cargo run --example echo -- BOT_TOKEN
//! ```
use futures::StreamExt;
use futures_util::future::{select, Either};
use grammers_client::session::Session;
use grammers_client::{Client, Config, InitParams, Update};
use simple_logger::SimpleLogger;
use std::env;
use std::pin::pin;

use futures::{
future::{select, Either},
StreamExt,
};
use simple_logger::SimpleLogger;
use tokio::{runtime, task};

use grammers_client::session::Session;
use grammers_client::{Client, Config, InitParams, Update};

type Result = std::result::Result<(), Box<dyn std::error::Error>>;

const SESSION_FILE: &str = "echo.session";
Expand Down
14 changes: 5 additions & 9 deletions lib/grammers-client/examples/inline-pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
//! how much data a button's payload can contain, and to keep it simple, we're storing it inline
//! in decimal, so the numbers can't get too large).
use futures::StreamExt;
use futures_util::future::{select, Either};
use grammers_client::session::Session;
use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update};
use simple_logger::SimpleLogger;
use std::env;
use std::pin::pin;

use futures::{
future::{select, Either},
StreamExt,
};
use simple_logger::SimpleLogger;
use tokio::{runtime, task};

use grammers_client::session::Session;
use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update};

type Result = std::result::Result<(), Box<dyn std::error::Error>>;

const SESSION_FILE: &str = "inline-pagination.session";
Expand Down
22 changes: 12 additions & 10 deletions lib/grammers-client/src/client/chats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ use grammers_session::{PackedChat, PackedType};
use grammers_tl_types as tl;

use super::Client;
use crate::{
types::{
chats::{AdminRightsBuilderInner, BannedRightsBuilderInner},
AdminRightsBuilder, BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant,
Photo, User,
},
utils::poll_future_ready,
use crate::types::{
chats::AdminRightsBuilderInner, chats::BannedRightsBuilderInner, AdminRightsBuilder,
BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant, Photo, User,
};
pub use grammers_mtsender::{AuthorizationError, InvocationError};

Expand Down Expand Up @@ -214,7 +210,9 @@ impl Stream for ParticipantStream {
Self::Empty => {}
Self::Chat { buffer, .. } => {
if buffer.is_empty() {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) {
let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
return Poll::Ready(Some(Err(e)));
}
}
Expand All @@ -227,7 +225,9 @@ impl Stream for ParticipantStream {
}
}

if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) {
let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down Expand Up @@ -341,7 +341,9 @@ impl Stream for ProfilePhotoStream {
}
}

if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) {
let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down
5 changes: 3 additions & 2 deletions lib/grammers-client/src/client/dialogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use grammers_session::PackedChat;
use grammers_tl_types as tl;

use crate::types::{ChatMap, Dialog, IterBuffer, Message};
use crate::utils::poll_future_ready;
use crate::Client;

const MAX_LIMIT: usize = 100;
Expand Down Expand Up @@ -80,7 +79,9 @@ impl Stream for DialogStream {

let result = {
self.request.limit = self.determine_limit(MAX_LIMIT);
poll_future_ready!(cx, self.client.invoke(&self.request))
let this = self.client.invoke(&self.request);
futures::pin_mut!(this);
futures::ready!(this.poll(cx))
}?;

let (dialogs, messages, users, chats) = match result {
Expand Down
18 changes: 11 additions & 7 deletions lib/grammers-client/src/client/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ use std::future::Future;
use std::task::Poll;
use std::{io::SeekFrom, path::Path, sync::Arc};

use futures::{
stream::{FuturesUnordered, StreamExt},
Stream, TryStreamExt,
};
use futures::{Stream, TryStreamExt};
use futures_util::stream::{FuturesUnordered, StreamExt};
use tokio::sync::mpsc::unbounded_channel;
use tokio::{
fs,
Expand All @@ -24,7 +22,7 @@ use grammers_mtsender::InvocationError;
use grammers_tl_types as tl;

use crate::types::{photo_sizes::PhotoSize, Downloadable, Media, Uploaded};
use crate::utils::{generate_random_id, poll_future_ready};
use crate::utils::generate_random_id;
use crate::Client;

pub const MIN_CHUNK_SIZE: i32 = 4 * 1024;
Expand Down Expand Up @@ -141,9 +139,15 @@ impl Stream for DownloadStream {
loop {
let result = match self.dc.take() {
Some(dc) => {
poll_future_ready!(cx, self.client.invoke_in_dc(&self.request, dc as i32))
let this = self.client.invoke_in_dc(&self.request, dc as i32);
futures::pin_mut!(this);
futures::ready!(this.poll(cx))
}
None => {
let this = self.client.invoke(&self.request);
futures::pin_mut!(this);
futures::ready!(this.poll(cx))
}
None => poll_future_ready!(cx, self.client.invoke(&self.request)),
};

break match result {
Expand Down
14 changes: 10 additions & 4 deletions lib/grammers-client/src/client/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! Methods related to sending messages.
use crate::types::message::EMPTY_MESSAGE;
use crate::types::{InputReactions, IterBuffer, Message};
use crate::utils::{generate_random_id, generate_random_ids, poll_future_ready};
use crate::utils::{generate_random_id, generate_random_ids};
use crate::{types, ChatMap, Client, InputMedia};
use chrono::{DateTime, FixedOffset};
use futures::Stream;
Expand Down Expand Up @@ -249,7 +249,9 @@ impl Stream for MessageStream {
{
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) {
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down Expand Up @@ -388,7 +390,9 @@ impl Stream for SearchStream {
{
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) {
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down Expand Up @@ -475,7 +479,9 @@ impl Stream for GlobalSearchStream {
let offset_rate = {
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
match poll_future_ready!(cx, self.fill_buffer(limit)) {
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
match futures::ready!(this.poll(cx)) {
Ok(offset_rate) => offset_rate,
Err(e) => return Poll::Ready(Some(Err(e))),
}
Expand Down
Loading

0 comments on commit d5e40d1

Please sign in to comment.