Skip to content

Commit

Permalink
Upgrade ntex-io
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Nov 9, 2023
1 parent e4fdd9e commit b8b645d
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 21 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.4.4] - 2023-11-09

* Update ntex-io

## [0.4.3] - 2023-10-16

* Drop connection if client overflows concurrent streams number multiple times
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-h2"
version = "0.4.3"
version = "0.4.4"
license = "MIT"
authors = ["Nikolay Kim <[email protected]>"]
description = "An HTTP/2 client and server"
Expand All @@ -20,9 +20,9 @@ unstable = []

[dependencies]
ntex-connect = "0.3.2"
ntex-io = "0.3.4"
ntex-io = "0.3.6"
ntex-http = "0.1.10"
ntex-bytes = "0.1.19"
ntex-bytes = "0.1.20"
ntex-codec = "0.6.2"
ntex-service = "1.2.7"
ntex-util = "0.3.2"
Expand Down
10 changes: 6 additions & 4 deletions src/client/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{fmt, rc::Rc};
use ntex_bytes::ByteString;
use ntex_http::{uri::Scheme, HeaderMap, Method};
use ntex_io::{Dispatcher as IoDispatcher, IoBoxed, OnDisconnect};
use ntex_util::time::Seconds;

use crate::connection::Connection;
use crate::default::DefaultControlService;
Expand Down Expand Up @@ -54,10 +53,13 @@ impl SimpleClient {
DefaultControlService,
HandleService::new(storage.clone()),
);
let fut = IoDispatcher::new(io, con.codec().clone(), disp)
.keepalive_timeout(Seconds::ZERO)
.disconnect_timeout(con.config().disconnect_timeout.get());

let fut = IoDispatcher::with_config(
io,
con.codec().clone(),
disp,
&con.config().dispatcher_config,
);
ntex_rt::spawn(async move {
let _ = fut.await;
});
Expand Down
16 changes: 13 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{cell::Cell, fmt, rc::Rc, time::Duration};

use ntex_io::DispatcherConfig;
use ntex_util::{channel::pool, time::Seconds};

use crate::{consts, frame, frame::Settings, frame::WindowSize};
Expand Down Expand Up @@ -35,8 +36,8 @@ pub(crate) struct ConfigInner {
/// Connection timeouts
pub(crate) handshake_timeout: Cell<Seconds>,
pub(crate) client_timeout: Cell<Seconds>,
pub(crate) disconnect_timeout: Cell<Seconds>,
pub(crate) ping_timeout: Cell<Seconds>,
pub(crate) dispatcher_config: DispatcherConfig,

/// Config flags
flags: Cell<ConfigFlags>,
Expand Down Expand Up @@ -73,19 +74,24 @@ impl Config {
Cell::new(ConfigFlags::empty())
};

let dispatcher_config = DispatcherConfig::default();
dispatcher_config
.set_keepalive_timeout(Seconds(0))
.set_disconnect_timeout(Seconds(3));

Config(Rc::new(ConfigInner {
flags,
window_sz,
window_sz_threshold,
connection_window_sz,
connection_window_sz_threshold,
dispatcher_config,
settings: Cell::new(settings),
reset_max: Cell::new(consts::DEFAULT_RESET_STREAM_MAX),
reset_duration: Cell::new(consts::DEFAULT_RESET_STREAM_SECS.into()),
remote_max_concurrent_streams: Cell::new(None),
client_timeout: Cell::new(Seconds(0)),
handshake_timeout: Cell::new(Seconds(5)),
disconnect_timeout: Cell::new(Seconds(3)),
ping_timeout: Cell::new(Seconds(10)),
pool: pool::new(),
}))
Expand Down Expand Up @@ -290,7 +296,7 @@ impl Config {
///
/// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout(&self, val: Seconds) -> &Self {
self.0.disconnect_timeout.set(val);
self.0.dispatcher_config.set_disconnect_timeout(val);
self
}

Expand All @@ -306,6 +312,10 @@ impl Config {
pub fn is_server(&self) -> bool {
self.0.flags.get().contains(ConfigFlags::SERVER)
}

pub(crate) fn inner(&self) -> &ConfigInner {
self.0.as_ref()
}
}

impl fmt::Debug for Config {
Expand Down
13 changes: 13 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,19 @@ impl RecvHalfConnection {
streams
}

pub(crate) fn read_timeout(&self) -> HashMap<StreamId, StreamRef> {
self.0.error.set(Some(ConnectionError::ReadTimeout.into()));

let streams = mem::take(&mut *self.0.streams.borrow_mut());
for stream in streams.values() {
stream.set_failed_stream(ConnectionError::ReadTimeout.into())
}

self.encode(frame::GoAway::new(frame::Reason::NO_ERROR));
self.0.io.close();
streams
}

pub(crate) fn proto_error(&self, err: &ConnectionError) -> HashMap<StreamId, StreamRef> {
self.0.error.set(Some((*err).into()));
self.0.readiness.borrow_mut().clear();
Expand Down
10 changes: 10 additions & 0 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,16 @@ where
ctx,
)))
}
DispatchItem::ReadTimeout => {
log::warn!("did not receive complete frame in time, closing connection");
let streams = self.connection.read_timeout();
self.handle_connection_error(streams, ConnectionError::ReadTimeout.into());
Either::Right(Either::Left(ControlResponse::new(
ControlMessage::proto_error(ConnectionError::ReadTimeout),
&self.inner,
ctx,
)))
}
DispatchItem::Disconnect(err) => {
let streams = self.connection.disconnect();
self.handle_connection_error(streams, OperationError::Disconnected);
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub enum ConnectionError {
/// Keep-alive timeout
#[error("Keep-alive timeout")]
KeepaliveTimeout,
/// Read timeout
#[error("Read timeout")]
ReadTimeout,
}

impl ConnectionError {
Expand Down Expand Up @@ -72,6 +75,9 @@ impl ConnectionError {
ConnectionError::KeepaliveTimeout => {
GoAway::new(Reason::NO_ERROR).set_data("Keep-alive timeout")
}
ConnectionError::ReadTimeout => {
GoAway::new(Reason::NO_ERROR).set_data("Frame read timeout")
}
}
}
}
Expand Down
28 changes: 17 additions & 11 deletions src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt, rc::Rc};
use ntex_io::{Dispatcher as IoDispatcher, Filter, Io, IoBoxed};
use ntex_service::{Service, ServiceCtx, ServiceFactory};
use ntex_util::future::{BoxFuture, Ready};
use ntex_util::time::{sleep, timeout_checked, Seconds};
use ntex_util::time::{sleep, timeout_checked};

use crate::connection::{Connection, ConnectionFlags};
use crate::control::{ControlMessage, ControlResult};
Expand Down Expand Up @@ -143,11 +143,14 @@ where
let (codec, con) = create_connection(&io, &inner.config);

// start protocol dispatcher
IoDispatcher::new(io, codec, Dispatcher::new(con, ctl_srv, pub_srv))
.keepalive_timeout(Seconds::ZERO)
.disconnect_timeout(inner.config.0.disconnect_timeout.get())
.await
.map_err(|_| ServerError::Dispatcher)
IoDispatcher::with_config(
io,
codec,
Dispatcher::new(con, ctl_srv, pub_srv),
&inner.config.inner().dispatcher_config,
)
.await
.map_err(|_| ServerError::Dispatcher)
}
}

Expand Down Expand Up @@ -267,9 +270,12 @@ where
let (codec, con) = create_connection(&io, &config);

// start protocol dispatcher
IoDispatcher::new(io, codec, Dispatcher::new(con, ctl_svc, pub_svc))
.keepalive_timeout(Seconds::ZERO)
.disconnect_timeout(config.0.disconnect_timeout.get())
.await
.map_err(|_| ServerError::Dispatcher)
IoDispatcher::with_config(
io,
codec,
Dispatcher::new(con, ctl_svc, pub_svc),
&config.inner().dispatcher_config,
)
.await
.map_err(|_| ServerError::Dispatcher)
}

0 comments on commit b8b645d

Please sign in to comment.