From 43e69d1fedfc3dae9885a159d5c10cef703df796 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 19 Dec 2021 18:03:49 +0600 Subject: [PATCH 1/4] upgrade to ntex 0.5 --- CHANGES.md | 4 ++++ Cargo.toml | 4 ++-- src/client.rs | 40 ++++++++++++++++++++++++---------------- src/connector.rs | 48 +++++++++++------------------------------------- src/simple.rs | 21 ++++++++++----------- 5 files changed, 51 insertions(+), 66 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7c780c5..b51e735 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.0-b.0] - 2021-12-19 + +* upgrade to ntex 0.5 + ## [0.2.4] - 2021-12-02 * Add memory pools support diff --git a/Cargo.toml b/Cargo.toml index 9a5e795..5d8a816 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-redis" -version = "0.2.4" +version = "0.3.0-b.0" authors = ["ntex contributors "] description = "Redis client" documentation = "https://docs.rs/ntex-redis" @@ -21,7 +21,7 @@ openssl = ["ntex/openssl"] rustls = ["ntex/rustls"] [dependencies] -ntex = "0.4.11" +ntex = "0.5.0-b.0" itoa = "0.4.5" btoi = "0.4.2" log = "0.4" diff --git a/src/client.rs b/src/client.rs index 5c0f2be..1f93080 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,8 +1,9 @@ use std::collections::VecDeque; use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; +use ntex::io::{IoBoxed, IoRef, OnDisconnect}; use ntex::util::{poll_fn, Either, Ready}; -use ntex::{channel::pool, framed::State, service::Service}; +use ntex::{channel::pool, service::Service}; use super::cmd::Command; use super::codec::{Codec, Request, Response}; @@ -13,20 +14,21 @@ type Queue = Rc>>>>; #[derive(Clone)] /// Shared redis client pub struct Client { - state: State, + io: IoRef, queue: Queue, + disconnect: OnDisconnect, pool: pool::Pool>, } impl Client { - pub(crate) fn new(state: State) -> Self { + pub(crate) fn new(io: IoBoxed) -> Self { let queue: Queue = Rc::new(RefCell::new(VecDeque::new())); // read redis response task - let state2 = state.clone(); + let io_ref = io.get_ref(); let queue2 = queue.clone(); ntex::rt::spawn(async move { - let read = state2.read(); + let read = io.read(); poll_fn(|cx| { loop { @@ -36,7 +38,7 @@ impl Client { let _ = tx.send(Err(e)); } queue2.borrow_mut().clear(); - state2.shutdown_io(); + let _ = io.poll_shutdown(cx); return Poll::Ready(()); } Ok(Some(item)) => { @@ -50,18 +52,24 @@ impl Client { } } - if !state2.is_open() { + if io.is_closed() { return Poll::Ready(()); } - state2.register_dispatcher(cx.waker()); - Poll::Pending + if let Err(_) = read.poll_read_ready(cx) { + Poll::Ready(()) + } else { + Poll::Pending + } }) .await }); + let disconnect = io_ref.on_disconnect(); + Client { - state, queue, + disconnect, + io: io_ref, pool: pool::new(), } } @@ -71,7 +79,7 @@ impl Client { where T: Command, { - let is_open = self.state.is_open(); + let is_open = !self.io.is_closed(); let fut = self.call(cmd.to_request()); async move { @@ -93,7 +101,7 @@ impl Client { /// Returns true if underlying transport is connected to redis pub fn is_connected(&self) -> bool { - self.state.is_open() + !self.io.is_closed() } } @@ -103,8 +111,8 @@ impl Service for Client { type Error = Error; type Future = Either>; - fn poll_ready(&self, _cx: &mut Context<'_>) -> Poll> { - if !self.state.is_open() { + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + if self.disconnect.poll_ready(cx).is_ready() { Poll::Ready(Err(Error::Disconnected)) } else { Poll::Ready(Ok(())) @@ -112,7 +120,7 @@ impl Service for Client { } fn call(&self, req: Request) -> Self::Future { - if let Err(e) = self.state.write().encode(req, &Codec) { + if let Err(e) = self.io.write().encode(req, &Codec) { Either::Right(Ready::Err(e)) } else { let (tx, rx) = self.pool.channel(); @@ -125,7 +133,7 @@ impl Service for Client { impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Client") - .field("connected", &self.state.is_open()) + .field("connected", &!self.io.is_closed()) .finish() } } diff --git a/src/connector.rs b/src/connector.rs index 0e31428..1d435c2 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -1,8 +1,7 @@ -use std::{cell::RefCell, future::Future, rc::Rc}; +use std::future::Future; -use ntex::codec::{AsyncRead, AsyncWrite}; use ntex::connect::{self, Address, Connect, Connector}; -use ntex::framed::{ReadTask, State, WriteTask}; +use ntex::io::{Io, IoBoxed}; use ntex::{service::Service, time::Seconds, util::ByteString, util::PoolId, util::PoolRef}; #[cfg(feature = "openssl")] @@ -41,8 +40,7 @@ where impl RedisConnector where A: Address + Clone, - T: Service, Error = connect::ConnectError>, - T::Response: AsyncRead + AsyncWrite + Unpin + 'static, + T: Service, Response = IoBoxed, Error = connect::ConnectError>, { /// Add redis auth password pub fn password(mut self, password: U) -> Self @@ -63,26 +61,10 @@ where self } - #[doc(hidden)] - #[deprecated(since = "0.2.4", note = "Use memory pool config")] - #[inline] - /// Set read/write buffer params - /// - /// By default read buffer is 16kb, write buffer is 16kb - pub fn buffer_params( - self, - _max_read_buf_size: u16, - _max_write_buf_size: u16, - _min_buf_size: u16, - ) -> Self { - self - } - /// Use custom connector - pub fn connector(self, connector: U) -> RedisConnector + pub fn connector(self, connector: U) -> RedisConnector where - U: Service, Error = connect::ConnectError>, - U::Response: AsyncRead + AsyncWrite + Unpin + 'static, + U: Service, Response = Io, Error = connect::ConnectError>, { RedisConnector { connector, @@ -125,14 +107,10 @@ where async move { let io = fut.await?; + io.set_memory_pool(pool); + io.set_disconnect_timeout(Seconds::ZERO.into()); - let state = State::with_memory_pool(pool); - state.set_disconnect_timeout(Seconds::ZERO); - let io = Rc::new(RefCell::new(io)); - ntex::rt::spawn(ReadTask::new(io.clone(), state.clone())); - ntex::rt::spawn(WriteTask::new(io, state.clone())); - - let client = Client::new(state); + let client = Client::new(io); if passwords.is_empty() { Ok(client) @@ -155,14 +133,10 @@ where async move { let io = fut.await?; + io.set_memory_pool(pool); + io.set_disconnect_timeout(Seconds::ZERO.into()); - let state = State::with_memory_pool(pool); - state.set_disconnect_timeout(Seconds::ZERO); - let io = Rc::new(RefCell::new(io)); - ntex::rt::spawn(ReadTask::new(io.clone(), state.clone())); - ntex::rt::spawn(WriteTask::new(io, state.clone())); - - let mut client = SimpleClient::new(state); + let mut client = SimpleClient::new(io); if passwords.is_empty() { Ok(client) diff --git a/src/simple.rs b/src/simple.rs index 18e226e..b60440f 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -1,6 +1,6 @@ use std::task::Poll; -use ntex::{framed::State, util::poll_fn}; +use ntex::{io::IoBoxed, util::poll_fn}; use super::cmd::Command; use super::codec::Codec; @@ -8,13 +8,13 @@ use super::errors::{CommandError, Error}; /// Redis client pub struct SimpleClient { - state: State, + io: IoBoxed, } impl SimpleClient { /// Create new simple client - pub(crate) fn new(state: State) -> Self { - SimpleClient { state } + pub(crate) fn new(io: IoBoxed) -> Self { + SimpleClient { io } } /// Execute redis command @@ -22,9 +22,9 @@ impl SimpleClient { where U: Command, { - self.state.write().encode(cmd.to_request(), &Codec)?; + self.io.write().encode(cmd.to_request(), &Codec)?; - let read = self.state.read(); + let read = self.io.read(); poll_fn(|cx| { if let Some(item) = read.decode(&Codec)? { return Poll::Ready(U::to_output( @@ -32,12 +32,11 @@ impl SimpleClient { )); } - if !self.state.is_open() { - return Poll::Ready(Err(CommandError::Protocol(Error::Disconnected))); + if let Err(err) = read.poll_read_ready(cx) { + Poll::Ready(Err(CommandError::Protocol(Error::Io(err)))) + } else { + Poll::Pending } - - self.state.register_dispatcher(cx.waker()); - Poll::Pending }) .await } From 67a6850f95bf2e483027a0bb03a77b09bfe900e8 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 19 Dec 2021 18:19:40 +0600 Subject: [PATCH 2/4] fix connector definitions --- src/client.rs | 2 +- src/connector.rs | 45 +++++++++++++++++++++++++++++++++------------ 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/src/client.rs b/src/client.rs index 1f93080..75ba682 100644 --- a/src/client.rs +++ b/src/client.rs @@ -55,7 +55,7 @@ impl Client { if io.is_closed() { return Poll::Ready(()); } - if let Err(_) = read.poll_read_ready(cx) { + if read.poll_read_ready(cx).is_err() { Poll::Ready(()) } else { Poll::Pending diff --git a/src/connector.rs b/src/connector.rs index 1d435c2..14ded0a 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -1,14 +1,14 @@ use std::future::Future; use ntex::connect::{self, Address, Connect, Connector}; -use ntex::io::{Io, IoBoxed}; +use ntex::io::{Filter, Io, IoBoxed}; use ntex::{service::Service, time::Seconds, util::ByteString, util::PoolId, util::PoolRef}; #[cfg(feature = "openssl")] -use ntex::connect::openssl::{OpensslConnector, SslConnector}; +use ntex::connect::openssl::{self, SslConnector}; #[cfg(feature = "rustls")] -use ntex::connect::rustls::{ClientConfig, RustlsConnector}; +use ntex::connect::rustls::{self, ClientConfig}; use super::errors::ConnectError; use super::{cmd, Client, SimpleClient}; @@ -27,11 +27,16 @@ where { #[allow(clippy::new_ret_no_self)] /// Create new redis connector - pub fn new(address: A) -> RedisConnector> { + pub fn new( + address: A, + ) -> RedisConnector< + A, + impl Service, Response = IoBoxed, Error = connect::ConnectError>, + > { RedisConnector { address, passwords: Vec::new(), - connector: Connector::default(), + connector: Connector::default().map(|io| io.into_boxed()), pool: PoolId::P7.pool_ref(), } } @@ -62,12 +67,19 @@ where } /// Use custom connector - pub fn connector(self, connector: U) -> RedisConnector + pub fn connector( + self, + connector: U, + ) -> RedisConnector< + A, + impl Service, Response = IoBoxed, Error = connect::ConnectError>, + > where + F: Filter, U: Service, Response = Io, Error = connect::ConnectError>, { RedisConnector { - connector, + connector: connector.map(|io| io.into_boxed()), address: self.address, passwords: self.passwords, pool: self.pool, @@ -76,11 +88,17 @@ where #[cfg(feature = "openssl")] /// Use openssl connector. - pub fn openssl(self, connector: SslConnector) -> RedisConnector> { + pub fn openssl( + self, + connector: SslConnector, + ) -> RedisConnector< + A, + impl Service, Response = IoBoxed, Error = connect::ConnectError>, + > { RedisConnector { address: self.address, passwords: self.passwords, - connector: OpensslConnector::new(connector), + connector: openssl::Connector::new(connector).map(|io| io.into_boxed()), pool: self.pool, } } @@ -89,12 +107,15 @@ where /// Use rustls connector. pub fn rustls( self, - config: std::sync::Arc, - ) -> RedisConnector> { + config: ClientConfig, + ) -> RedisConnector< + A, + impl Service, Response = IoBoxed, Error = connect::ConnectError>, + > { RedisConnector { address: self.address, passwords: self.passwords, - connector: RustlsConnector::new(config), + connector: rustls::Connector::new(config).map(|io| io.into_boxed()), pool: self.pool, } } From 376b68656b1eca13bdc59f643b90f61b1d34ad6c Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 20 Dec 2021 21:41:46 +0600 Subject: [PATCH 3/4] update to 0.5.0-b.1 --- Cargo.toml | 2 +- src/client.rs | 52 +++++++++++++++++++++--------------------------- src/connector.rs | 2 +- src/simple.rs | 26 ++++++++++-------------- 4 files changed, 35 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5d8a816..7ccafa3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ openssl = ["ntex/openssl"] rustls = ["ntex/rustls"] [dependencies] -ntex = "0.5.0-b.0" +ntex = "0.5.0-b.1" itoa = "0.4.5" btoi = "0.4.2" log = "0.4" diff --git a/src/client.rs b/src/client.rs index 75ba682..94e77c5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; use ntex::io::{IoBoxed, IoRef, OnDisconnect}; -use ntex::util::{poll_fn, Either, Ready}; +use ntex::util::{poll_fn, ready, Either, Ready}; use ntex::{channel::pool, service::Service}; use super::cmd::Command; @@ -28,38 +28,32 @@ impl Client { let io_ref = io.get_ref(); let queue2 = queue.clone(); ntex::rt::spawn(async move { - let read = io.read(); - - poll_fn(|cx| { - loop { - match read.decode(&Codec) { - Err(e) => { - if let Some(tx) = queue2.borrow_mut().pop_front() { - let _ = tx.send(Err(e)); - } - queue2.borrow_mut().clear(); - let _ = io.poll_shutdown(cx); - return Poll::Ready(()); - } - Ok(Some(item)) => { - if let Some(tx) = queue2.borrow_mut().pop_front() { - let _ = tx.send(Ok(item)); - } else { - log::error!("Unexpected redis response: {:?}", item); - } - } - Ok(None) => break, + poll_fn(|cx| match ready!(io.poll_read_next(&Codec, cx)) { + Some(Ok(item)) => { + if let Some(tx) = queue2.borrow_mut().pop_front() { + let _ = tx.send(Ok(item)); + } else { + log::error!("Unexpected redis response: {:?}", item); } + Poll::Pending } - - if io.is_closed() { + Some(Err(Either::Left(e))) => { + if let Some(tx) = queue2.borrow_mut().pop_front() { + let _ = tx.send(Err(e)); + } + queue2.borrow_mut().clear(); + let _ = ready!(io.poll_shutdown(cx)); return Poll::Ready(()); } - if read.poll_read_ready(cx).is_err() { - Poll::Ready(()) - } else { - Poll::Pending + Some(Err(Either::Right(e))) => { + if let Some(tx) = queue2.borrow_mut().pop_front() { + let _ = tx.send(Err(e.into())); + } + queue2.borrow_mut().clear(); + let _ = ready!(io.poll_shutdown(cx)); + return Poll::Ready(()); } + None => Poll::Ready(()), }) .await }); @@ -120,7 +114,7 @@ impl Service for Client { } fn call(&self, req: Request) -> Self::Future { - if let Err(e) = self.io.write().encode(req, &Codec) { + if let Err(e) = self.io.encode(req, &Codec) { Either::Right(Ready::Err(e)) } else { let (tx, rx) = self.pool.channel(); diff --git a/src/connector.rs b/src/connector.rs index 14ded0a..04a3a89 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -157,7 +157,7 @@ where io.set_memory_pool(pool); io.set_disconnect_timeout(Seconds::ZERO.into()); - let mut client = SimpleClient::new(io); + let client = SimpleClient::new(io); if passwords.is_empty() { Ok(client) diff --git a/src/simple.rs b/src/simple.rs index b60440f..3449fc3 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -1,6 +1,6 @@ use std::task::Poll; -use ntex::{io::IoBoxed, util::poll_fn}; +use ntex::{io::IoBoxed, util::poll_fn, util::ready, util::Either}; use super::cmd::Command; use super::codec::Codec; @@ -18,25 +18,19 @@ impl SimpleClient { } /// Execute redis command - pub async fn exec(&mut self, cmd: U) -> Result + pub async fn exec(&self, cmd: U) -> Result where U: Command, { - self.io.write().encode(cmd.to_request(), &Codec)?; + self.io.encode(cmd.to_request(), &Codec)?; - let read = self.io.read(); - poll_fn(|cx| { - if let Some(item) = read.decode(&Codec)? { - return Poll::Ready(U::to_output( - item.into_result().map_err(CommandError::Error)?, - )); - } - - if let Err(err) = read.poll_read_ready(cx) { - Poll::Ready(Err(CommandError::Protocol(Error::Io(err)))) - } else { - Poll::Pending - } + poll_fn(|cx| match ready!(self.io.poll_read_next(&Codec, cx)) { + Some(Ok(item)) => Poll::Ready(U::to_output( + item.into_result().map_err(CommandError::Error)?, + )), + Some(Err(Either::Left(err))) => Poll::Ready(Err(CommandError::Protocol(err))), + Some(Err(Either::Right(err))) => Poll::Ready(Err(CommandError::Protocol(err.into()))), + None => Poll::Ready(Err(CommandError::Protocol(Error::Disconnected))), }) .await } From 41005e26d37b2128e45f0469aa017cc5eb87002b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 20 Dec 2021 22:11:05 +0600 Subject: [PATCH 4/4] fix loop --- src/client.rs | 46 +++++++++++++++++++++++---------------------- tests/test_redis.rs | 2 +- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/src/client.rs b/src/client.rs index 94e77c5..7071ef7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -28,32 +28,34 @@ impl Client { let io_ref = io.get_ref(); let queue2 = queue.clone(); ntex::rt::spawn(async move { - poll_fn(|cx| match ready!(io.poll_read_next(&Codec, cx)) { - Some(Ok(item)) => { - if let Some(tx) = queue2.borrow_mut().pop_front() { - let _ = tx.send(Ok(item)); - } else { - log::error!("Unexpected redis response: {:?}", item); + poll_fn(|cx| loop { + match ready!(io.poll_read_next(&Codec, cx)) { + Some(Ok(item)) => { + if let Some(tx) = queue2.borrow_mut().pop_front() { + let _ = tx.send(Ok(item)); + } else { + log::error!("Unexpected redis response: {:?}", item); + } + continue; } - Poll::Pending - } - Some(Err(Either::Left(e))) => { - if let Some(tx) = queue2.borrow_mut().pop_front() { - let _ = tx.send(Err(e)); + Some(Err(Either::Left(e))) => { + if let Some(tx) = queue2.borrow_mut().pop_front() { + let _ = tx.send(Err(e)); + } + queue2.borrow_mut().clear(); + let _ = ready!(io.poll_shutdown(cx)); + return Poll::Ready(()); } - queue2.borrow_mut().clear(); - let _ = ready!(io.poll_shutdown(cx)); - return Poll::Ready(()); - } - Some(Err(Either::Right(e))) => { - if let Some(tx) = queue2.borrow_mut().pop_front() { - let _ = tx.send(Err(e.into())); + Some(Err(Either::Right(e))) => { + if let Some(tx) = queue2.borrow_mut().pop_front() { + let _ = tx.send(Err(e.into())); + } + queue2.borrow_mut().clear(); + let _ = ready!(io.poll_shutdown(cx)); + return Poll::Ready(()); } - queue2.borrow_mut().clear(); - let _ = ready!(io.poll_shutdown(cx)); - return Poll::Ready(()); + None => return Poll::Ready(()), } - None => Poll::Ready(()), }) .await }); diff --git a/tests/test_redis.rs b/tests/test_redis.rs index 86ecd30..b2aec07 100644 --- a/tests/test_redis.rs +++ b/tests/test_redis.rs @@ -104,7 +104,7 @@ async fn test_keys() { #[ntex::test] async fn test_strings_simple() { - let mut redis = RedisConnector::new("127.0.0.1:6379") + let redis = RedisConnector::new("127.0.0.1:6379") .connect_simple() .await .unwrap();