Skip to content

Commit

Permalink
Merge pull request #3 from ntex-rs/ntex-io
Browse files Browse the repository at this point in the history
upgrade to ntex 0.5
  • Loading branch information
fafhrd91 authored Dec 20, 2021
2 parents 412f5bb + 41005e2 commit cd17426
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 108 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.0-b.0] - 2021-12-19

* upgrade to ntex 0.5

## [0.2.4] - 2021-12-02

* Add memory pools support
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-redis"
version = "0.2.4"
version = "0.3.0-b.0"
authors = ["ntex contributors <[email protected]>"]
description = "Redis client"
documentation = "https://docs.rs/ntex-redis"
Expand All @@ -21,7 +21,7 @@ openssl = ["ntex/openssl"]
rustls = ["ntex/rustls"]

[dependencies]
ntex = "0.4.11"
ntex = "0.5.0-b.1"
itoa = "0.4.5"
btoi = "0.4.2"
log = "0.4"
Expand Down
78 changes: 41 additions & 37 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::VecDeque;
use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};

use ntex::util::{poll_fn, Either, Ready};
use ntex::{channel::pool, framed::State, service::Service};
use ntex::io::{IoBoxed, IoRef, OnDisconnect};
use ntex::util::{poll_fn, ready, Either, Ready};
use ntex::{channel::pool, service::Service};

use super::cmd::Command;
use super::codec::{Codec, Request, Response};
Expand All @@ -13,55 +14,58 @@ type Queue = Rc<RefCell<VecDeque<pool::Sender<Result<Response, Error>>>>>;
#[derive(Clone)]
/// Shared redis client
pub struct Client {
state: State,
io: IoRef,
queue: Queue,
disconnect: OnDisconnect,
pool: pool::Pool<Result<Response, Error>>,
}

impl Client {
pub(crate) fn new(state: State) -> Self {
pub(crate) fn new(io: IoBoxed) -> Self {
let queue: Queue = Rc::new(RefCell::new(VecDeque::new()));

// read redis response task
let state2 = state.clone();
let io_ref = io.get_ref();
let queue2 = queue.clone();
ntex::rt::spawn(async move {
let read = state2.read();

poll_fn(|cx| {
loop {
match read.decode(&Codec) {
Err(e) => {
if let Some(tx) = queue2.borrow_mut().pop_front() {
let _ = tx.send(Err(e));
}
queue2.borrow_mut().clear();
state2.shutdown_io();
return Poll::Ready(());
poll_fn(|cx| loop {
match ready!(io.poll_read_next(&Codec, cx)) {
Some(Ok(item)) => {
if let Some(tx) = queue2.borrow_mut().pop_front() {
let _ = tx.send(Ok(item));
} else {
log::error!("Unexpected redis response: {:?}", item);
}
Ok(Some(item)) => {
if let Some(tx) = queue2.borrow_mut().pop_front() {
let _ = tx.send(Ok(item));
} else {
log::error!("Unexpected redis response: {:?}", item);
}
continue;
}
Some(Err(Either::Left(e))) => {
if let Some(tx) = queue2.borrow_mut().pop_front() {
let _ = tx.send(Err(e));
}
Ok(None) => break,
queue2.borrow_mut().clear();
let _ = ready!(io.poll_shutdown(cx));
return Poll::Ready(());
}
Some(Err(Either::Right(e))) => {
if let Some(tx) = queue2.borrow_mut().pop_front() {
let _ = tx.send(Err(e.into()));
}
queue2.borrow_mut().clear();
let _ = ready!(io.poll_shutdown(cx));
return Poll::Ready(());
}
None => return Poll::Ready(()),
}

if !state2.is_open() {
return Poll::Ready(());
}
state2.register_dispatcher(cx.waker());
Poll::Pending
})
.await
});

let disconnect = io_ref.on_disconnect();

Client {
state,
queue,
disconnect,
io: io_ref,
pool: pool::new(),
}
}
Expand All @@ -71,7 +75,7 @@ impl Client {
where
T: Command,
{
let is_open = self.state.is_open();
let is_open = !self.io.is_closed();
let fut = self.call(cmd.to_request());

async move {
Expand All @@ -93,7 +97,7 @@ impl Client {

/// Returns true if underlying transport is connected to redis
pub fn is_connected(&self) -> bool {
self.state.is_open()
!self.io.is_closed()
}
}

Expand All @@ -103,16 +107,16 @@ impl Service for Client {
type Error = Error;
type Future = Either<CommandResult, Ready<Response, Error>>;

fn poll_ready(&self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if !self.state.is_open() {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.disconnect.poll_ready(cx).is_ready() {
Poll::Ready(Err(Error::Disconnected))
} else {
Poll::Ready(Ok(()))
}
}

fn call(&self, req: Request) -> Self::Future {
if let Err(e) = self.state.write().encode(req, &Codec) {
if let Err(e) = self.io.encode(req, &Codec) {
Either::Right(Ready::Err(e))
} else {
let (tx, rx) = self.pool.channel();
Expand All @@ -125,7 +129,7 @@ impl Service for Client {
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("connected", &self.state.is_open())
.field("connected", &!self.io.is_closed())
.finish()
}
}
Expand Down
89 changes: 42 additions & 47 deletions src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::{cell::RefCell, future::Future, rc::Rc};
use std::future::Future;

use ntex::codec::{AsyncRead, AsyncWrite};
use ntex::connect::{self, Address, Connect, Connector};
use ntex::framed::{ReadTask, State, WriteTask};
use ntex::io::{Filter, Io, IoBoxed};
use ntex::{service::Service, time::Seconds, util::ByteString, util::PoolId, util::PoolRef};

#[cfg(feature = "openssl")]
use ntex::connect::openssl::{OpensslConnector, SslConnector};
use ntex::connect::openssl::{self, SslConnector};

#[cfg(feature = "rustls")]
use ntex::connect::rustls::{ClientConfig, RustlsConnector};
use ntex::connect::rustls::{self, ClientConfig};

use super::errors::ConnectError;
use super::{cmd, Client, SimpleClient};
Expand All @@ -28,11 +27,16 @@ where
{
#[allow(clippy::new_ret_no_self)]
/// Create new redis connector
pub fn new(address: A) -> RedisConnector<A, Connector<A>> {
pub fn new(
address: A,
) -> RedisConnector<
A,
impl Service<Request = Connect<A>, Response = IoBoxed, Error = connect::ConnectError>,
> {
RedisConnector {
address,
passwords: Vec::new(),
connector: Connector::default(),
connector: Connector::default().map(|io| io.into_boxed()),
pool: PoolId::P7.pool_ref(),
}
}
Expand All @@ -41,8 +45,7 @@ where
impl<A, T> RedisConnector<A, T>
where
A: Address + Clone,
T: Service<Request = Connect<A>, Error = connect::ConnectError>,
T::Response: AsyncRead + AsyncWrite + Unpin + 'static,
T: Service<Request = Connect<A>, Response = IoBoxed, Error = connect::ConnectError>,
{
/// Add redis auth password
pub fn password<U>(mut self, password: U) -> Self
Expand All @@ -63,29 +66,20 @@ where
self
}

#[doc(hidden)]
#[deprecated(since = "0.2.4", note = "Use memory pool config")]
#[inline]
/// Set read/write buffer params
///
/// By default read buffer is 16kb, write buffer is 16kb
pub fn buffer_params(
self,
_max_read_buf_size: u16,
_max_write_buf_size: u16,
_min_buf_size: u16,
) -> Self {
self
}

/// Use custom connector
pub fn connector<U>(self, connector: U) -> RedisConnector<A, U>
pub fn connector<U, F>(
self,
connector: U,
) -> RedisConnector<
A,
impl Service<Request = Connect<A>, Response = IoBoxed, Error = connect::ConnectError>,
>
where
U: Service<Request = Connect<A>, Error = connect::ConnectError>,
U::Response: AsyncRead + AsyncWrite + Unpin + 'static,
F: Filter,
U: Service<Request = Connect<A>, Response = Io<F>, Error = connect::ConnectError>,
{
RedisConnector {
connector,
connector: connector.map(|io| io.into_boxed()),
address: self.address,
passwords: self.passwords,
pool: self.pool,
Expand All @@ -94,11 +88,17 @@ where

#[cfg(feature = "openssl")]
/// Use openssl connector.
pub fn openssl(self, connector: SslConnector) -> RedisConnector<A, OpensslConnector<A>> {
pub fn openssl(
self,
connector: SslConnector,
) -> RedisConnector<
A,
impl Service<Request = Connect<A>, Response = IoBoxed, Error = connect::ConnectError>,
> {
RedisConnector {
address: self.address,
passwords: self.passwords,
connector: OpensslConnector::new(connector),
connector: openssl::Connector::new(connector).map(|io| io.into_boxed()),
pool: self.pool,
}
}
Expand All @@ -107,12 +107,15 @@ where
/// Use rustls connector.
pub fn rustls(
self,
config: std::sync::Arc<ClientConfig>,
) -> RedisConnector<A, RustlsConnector<A>> {
config: ClientConfig,
) -> RedisConnector<
A,
impl Service<Request = Connect<A>, Response = IoBoxed, Error = connect::ConnectError>,
> {
RedisConnector {
address: self.address,
passwords: self.passwords,
connector: RustlsConnector::new(config),
connector: rustls::Connector::new(config).map(|io| io.into_boxed()),
pool: self.pool,
}
}
Expand All @@ -125,14 +128,10 @@ where

async move {
let io = fut.await?;
io.set_memory_pool(pool);
io.set_disconnect_timeout(Seconds::ZERO.into());

let state = State::with_memory_pool(pool);
state.set_disconnect_timeout(Seconds::ZERO);
let io = Rc::new(RefCell::new(io));
ntex::rt::spawn(ReadTask::new(io.clone(), state.clone()));
ntex::rt::spawn(WriteTask::new(io, state.clone()));

let client = Client::new(state);
let client = Client::new(io);

if passwords.is_empty() {
Ok(client)
Expand All @@ -155,14 +154,10 @@ where

async move {
let io = fut.await?;
io.set_memory_pool(pool);
io.set_disconnect_timeout(Seconds::ZERO.into());

let state = State::with_memory_pool(pool);
state.set_disconnect_timeout(Seconds::ZERO);
let io = Rc::new(RefCell::new(io));
ntex::rt::spawn(ReadTask::new(io.clone(), state.clone()));
ntex::rt::spawn(WriteTask::new(io, state.clone()));

let mut client = SimpleClient::new(state);
let client = SimpleClient::new(io);

if passwords.is_empty() {
Ok(client)
Expand Down
35 changes: 14 additions & 21 deletions src/simple.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,36 @@
use std::task::Poll;

use ntex::{framed::State, util::poll_fn};
use ntex::{io::IoBoxed, util::poll_fn, util::ready, util::Either};

use super::cmd::Command;
use super::codec::Codec;
use super::errors::{CommandError, Error};

/// Redis client
pub struct SimpleClient {
state: State,
io: IoBoxed,
}

impl SimpleClient {
/// Create new simple client
pub(crate) fn new(state: State) -> Self {
SimpleClient { state }
pub(crate) fn new(io: IoBoxed) -> Self {
SimpleClient { io }
}

/// Execute redis command
pub async fn exec<U>(&mut self, cmd: U) -> Result<U::Output, CommandError>
pub async fn exec<U>(&self, cmd: U) -> Result<U::Output, CommandError>
where
U: Command,
{
self.state.write().encode(cmd.to_request(), &Codec)?;

let read = self.state.read();
poll_fn(|cx| {
if let Some(item) = read.decode(&Codec)? {
return Poll::Ready(U::to_output(
item.into_result().map_err(CommandError::Error)?,
));
}

if !self.state.is_open() {
return Poll::Ready(Err(CommandError::Protocol(Error::Disconnected)));
}

self.state.register_dispatcher(cx.waker());
Poll::Pending
self.io.encode(cmd.to_request(), &Codec)?;

poll_fn(|cx| match ready!(self.io.poll_read_next(&Codec, cx)) {
Some(Ok(item)) => Poll::Ready(U::to_output(
item.into_result().map_err(CommandError::Error)?,
)),
Some(Err(Either::Left(err))) => Poll::Ready(Err(CommandError::Protocol(err))),
Some(Err(Either::Right(err))) => Poll::Ready(Err(CommandError::Protocol(err.into()))),
None => Poll::Ready(Err(CommandError::Protocol(Error::Disconnected))),
})
.await
}
Expand Down
Loading

0 comments on commit cd17426

Please sign in to comment.