diff --git a/examples/query-routing.rs b/examples/query-routing.rs index a0b829133..fc1b2b2da 100644 --- a/examples/query-routing.rs +++ b/examples/query-routing.rs @@ -39,7 +39,7 @@ async fn main() { .ok(); // Start building the query router plus upstreams. - let mut qr: QnameRouter, Vec, ReplyMessage> = + let mut qr: QnameRouter, Vec, (), ReplyMessage> = QnameRouter::new(); // Queries to the root go to 2606:4700:4700::1111 and 1.1.1.1. @@ -57,8 +57,8 @@ async fn main() { let conn_service = ClientTransportToSingleService::new(redun); qr.add(Name::>::from_str("nl").unwrap(), conn_service); - let srv = SingleServiceToService::new(qr); - let srv = MandatoryMiddlewareSvc::, _, _>::new(srv); + let srv = SingleServiceToService::<_, _, _, _>::new(qr); + let srv = MandatoryMiddlewareSvc::new(srv); let my_svc = Arc::new(srv); let udpsocket = UdpSocket::bind("[::1]:8053").await.unwrap(); diff --git a/examples/serve-zone.rs b/examples/serve-zone.rs index c82b30ceb..109676e24 100644 --- a/examples/serve-zone.rs +++ b/examples/serve-zone.rs @@ -118,16 +118,16 @@ async fn main() { let svc = service_fn(my_service, zones.clone()); #[cfg(feature = "siphasher")] - let svc = CookiesMiddlewareSvc::, _, _>::with_random_secret(svc); - let svc = EdnsMiddlewareSvc::, _, _>::new(svc); let svc = XfrMiddlewareSvc::, _, _, _>::new( svc, zones_and_diffs.clone(), 1, ); let svc = NotifyMiddlewareSvc::new(svc, DemoNotifyTarget); + let svc = TsigMiddlewareSvc::<_, _, _, ()>::new(svc, key_store); + let svc = CookiesMiddlewareSvc::, _, _>::with_random_secret(svc); + let svc = EdnsMiddlewareSvc::, _, _>::new(svc); let svc = MandatoryMiddlewareSvc::, _, _>::new(svc); - let svc = TsigMiddlewareSvc::new(svc, key_store); let svc = Arc::new(svc); let sock = UdpSocket::bind(&addr).await.unwrap(); @@ -135,15 +135,18 @@ async fn main() { let mut udp_metrics = vec![]; let num_cores = std::thread::available_parallelism().unwrap().get(); for _i in 0..num_cores { - let udp_srv = - DgramServer::new(sock.clone(), VecBufSource, svc.clone()); + let udp_srv = DgramServer::<_, _, _>::new( + sock.clone(), + VecBufSource, + svc.clone(), + ); let metrics = udp_srv.metrics(); udp_metrics.push(metrics); tokio::spawn(async move { udp_srv.run().await }); } let sock = TcpListener::bind(addr).await.unwrap(); - let tcp_srv = StreamServer::new(sock, VecBufSource, svc); + let tcp_srv = StreamServer::<_, _, _>::new(sock, VecBufSource, svc); let tcp_metrics = tcp_srv.metrics(); tokio::spawn(async move { tcp_srv.run().await }); @@ -240,8 +243,8 @@ async fn main() { } #[allow(clippy::type_complexity)] -fn my_service( - request: Request>, +fn my_service( + request: Request, RequestMeta>, zones: Arc, ) -> ServiceResult> { let question = request.message().sole_question().unwrap(); @@ -317,12 +320,12 @@ impl ZoneTreeWithDiffs { } } -impl XfrDataProvider for ZoneTreeWithDiffs { +impl XfrDataProvider> for ZoneTreeWithDiffs { type Diff = InMemoryZoneDiff; fn request( &self, - req: &Request, + req: &Request>, diff_from: Option, ) -> Pin< Box< @@ -338,6 +341,10 @@ impl XfrDataProvider for ZoneTreeWithDiffs { where Octs: Octets + Send + Sync, { + if req.metadata().is_none() { + eprintln!("Rejecting"); + return Box::pin(ready(Err(XfrDataProviderError::Refused))); + } let res = req .message() .sole_question() diff --git a/examples/server-transports.rs b/examples/server-transports.rs index 16bcff007..159d6a691 100644 --- a/examples/server-transports.rs +++ b/examples/server-transports.rs @@ -7,6 +7,7 @@ use core::time::Duration; use std::fs::File; use std::io; use std::io::BufReader; +use std::marker::Unpin; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; @@ -50,7 +51,7 @@ use domain::rdata::{Soa, A}; // Helper fn to create a dummy response to send back to the client fn mk_answer( - msg: &Request>, + msg: &Request, ()>, builder: MessageBuilder>, ) -> Result>, PushError> where @@ -69,7 +70,7 @@ where } fn mk_soa_answer( - msg: &Request>, + msg: &Request, ()>, builder: MessageBuilder>, ) -> Result>, PushError> where @@ -100,6 +101,7 @@ where //--- MySingleResultService +#[derive(Clone)] struct MySingleResultService; /// This example shows how to implement the [`Service`] trait directly. @@ -116,12 +118,12 @@ struct MySingleResultService; /// /// See [`query`] and [`name_to_ip`] for ways of implementing the [`Service`] /// trait for a function instead of a struct. -impl Service> for MySingleResultService { +impl Service, ()> for MySingleResultService { type Target = Vec; type Stream = Once>>; type Future = Ready; - fn call(&self, request: Request>) -> Self::Future { + fn call(&self, request: Request, ()>) -> Self::Future { let builder = mk_builder_for_target(); let additional = mk_answer(&request, builder).unwrap(); let item = Ok(CallResult::new(additional)); @@ -131,6 +133,7 @@ impl Service> for MySingleResultService { //--- MyAsyncStreamingService +#[derive(Clone)] struct MyAsyncStreamingService; /// This example also shows how to implement the [`Service`] trait directly. @@ -147,13 +150,13 @@ struct MyAsyncStreamingService; /// and/or Stream implementations that actually wait and/or stream, e.g. /// making the Stream type be UnboundedReceiver instead of Pin>. -impl Service> for MyAsyncStreamingService { +impl Service, ()> for MyAsyncStreamingService { type Target = Vec; type Stream = Pin> + Send>>; type Future = Pin + Send>>; - fn call(&self, request: Request>) -> Self::Future { + fn call(&self, request: Request, ()>) -> Self::Future { Box::pin(async move { if !matches!( request @@ -209,7 +212,10 @@ impl Service> for MyAsyncStreamingService { /// The function signature is slightly more complex than when using /// [`service_fn`] (see the [`query`] example below). #[allow(clippy::type_complexity)] -fn name_to_ip(request: Request>, _: ()) -> ServiceResult> { +fn name_to_ip( + request: Request, ()>, + _: (), +) -> ServiceResult> { let mut out_answer = None; if let Ok(question) = request.message().sole_question() { let qname = question.qname(); @@ -257,7 +263,7 @@ fn name_to_ip(request: Request>, _: ()) -> ServiceResult> { /// [`service_fn`] and supports passing in meta data without any extra /// boilerplate. fn query( - request: Request>, + request: Request, ()>, count: Arc, ) -> ServiceResult> { let cnt = count @@ -455,6 +461,7 @@ impl std::fmt::Display for Stats { } } +#[derive(Clone)] pub struct StatsMiddlewareSvc { svc: Svc, stats: Arc>, @@ -467,7 +474,7 @@ impl StatsMiddlewareSvc { Self { svc, stats } } - fn preprocess(&self, request: &Request) + fn preprocess(&self, request: &Request) where RequestOctets: Octets + Send + Sync + Unpin, { @@ -488,12 +495,12 @@ impl StatsMiddlewareSvc { } fn postprocess( - request: &Request, + request: &Request, response: &AdditionalBuilder>, stats: &RwLock, ) where RequestOctets: Octets + Send + Sync + Unpin, - Svc: Service, + Svc: Service, Svc::Target: AsRef<[u8]>, { let duration = Instant::now().duration_since(request.received_at()); @@ -510,13 +517,13 @@ impl StatsMiddlewareSvc { } fn map_stream_item( - request: Request, + request: Request, stream_item: ServiceResult, stats: &mut Arc>, ) -> ServiceResult where RequestOctets: Octets + Send + Sync + Unpin, - Svc: Service, + Svc: Service, Svc::Target: AsRef<[u8]>, { if let Ok(cr) = &stream_item { @@ -528,10 +535,11 @@ impl StatsMiddlewareSvc { } } -impl Service for StatsMiddlewareSvc +impl Service + for StatsMiddlewareSvc where RequestOctets: Octets + Send + Sync + 'static + Unpin, - Svc: Service, + Svc: Service, Svc::Target: AsRef<[u8]>, Svc::Future: Unpin, { @@ -551,7 +559,7 @@ where >; type Future = Ready; - fn call(&self, request: Request) -> Self::Future { + fn call(&self, request: Request) -> Self::Future { self.preprocess(&request); let svc_call_fut = self.svc.call(request.clone()); let map = PostprocessingStream::new( @@ -567,24 +575,19 @@ where //------------ build_middleware_chain() -------------------------------------- #[allow(clippy::type_complexity)] -fn build_middleware_chain( +fn build_middleware_chain( svc: Svc, stats: Arc>, -) -> StatsMiddlewareSvc< - MandatoryMiddlewareSvc< - Vec, - EdnsMiddlewareSvc< - Vec, - CookiesMiddlewareSvc, Svc, ()>, - (), - >, - (), - >, -> { +) -> impl Service +where + Octs: Octets + Send + Sync + Clone + Unpin + 'static, + Svc: Service, + >::Future: Unpin, +{ #[cfg(feature = "siphasher")] - let svc = CookiesMiddlewareSvc::, _, _>::with_random_secret(svc); - let svc = EdnsMiddlewareSvc::, _, _>::new(svc); - let svc = MandatoryMiddlewareSvc::, _, _>::new(svc); + let svc = CookiesMiddlewareSvc::::with_random_secret(svc); + let svc = EdnsMiddlewareSvc::new(svc); + let svc = MandatoryMiddlewareSvc::new(svc); StatsMiddlewareSvc::new(svc, stats.clone()) } diff --git a/src/net/server/adapter.rs b/src/net/server/adapter.rs index a28c1d80d..5a9c785d6 100644 --- a/src/net/server/adapter.rs +++ b/src/net/server/adapter.rs @@ -31,15 +31,30 @@ use std::string::ToString; use std::vec::Vec; /// Provide a [Service] trait for an object that implements [SingleService]. -pub struct SingleServiceToService { +pub struct SingleServiceToService +where + RequestMeta: Clone + Default, + RequestOcts: Octets + Send + Sync, + SVC: SingleService, + CR: ComposeReply + 'static, + Self: Send + Sync + 'static, +{ /// Service that is wrapped by this object. service: SVC, /// Phantom field for RequestOcts and CR. - _phantom: PhantomData<(RequestOcts, CR)>, + _phantom: PhantomData<(RequestOcts, CR, RequestMeta)>, } -impl SingleServiceToService { +impl + SingleServiceToService +where + RequestMeta: Clone + Default, + RequestOcts: Octets + Send + Sync, + SVC: SingleService, + CR: ComposeReply + 'static, + Self: Send + Sync + 'static, +{ /// Create a new [SingleServiceToService] object. pub fn new(service: SVC) -> Self { Self { @@ -49,18 +64,23 @@ impl SingleServiceToService { } } -impl Service - for SingleServiceToService +impl Service + for SingleServiceToService where + RequestMeta: Clone + Default, RequestOcts: Octets + Send + Sync, - SVC: SingleService, + SVC: SingleService, CR: ComposeReply + 'static, + Self: Send + Sync + 'static, { type Target = Vec; type Stream = Once>>; type Future = Pin + Send>>; - fn call(&self, request: Request) -> Self::Future { + fn call( + &self, + request: Request, + ) -> Self::Future { let fut = self.service.call(request); let fut = async move { let reply = match fut.await { @@ -114,7 +134,8 @@ where } } -impl SingleService +impl + SingleService for ClientTransportToSingleService where RequestOcts: AsRef<[u8]> + Clone + Debug + Octets + Send + Sync, @@ -123,7 +144,7 @@ where { fn call( &self, - request: Request, + request: Request, ) -> Pin> + Send + Sync>> where RequestOcts: AsRef<[u8]>, @@ -194,7 +215,7 @@ where } } -impl SingleService +impl SingleService for BoxClientTransportToSingleService where RequestOcts: AsRef<[u8]> + Clone + Debug + Octets + Send + Sync, @@ -202,7 +223,7 @@ where { fn call( &self, - request: Request, + request: Request, ) -> Pin> + Send + Sync>> where RequestOcts: AsRef<[u8]>, diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index a09609a05..e200e7e80 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -21,7 +21,6 @@ use tokio::time::{sleep_until, timeout}; use tracing::{debug, error, trace, warn}; use crate::base::message_builder::AdditionalBuilder; -use crate::base::wire::Composer; use crate::base::{Message, StreamTarget}; use crate::net::server::buf::BufSource; use crate::net::server::message::Request; @@ -219,11 +218,12 @@ impl Clone for Config { //------------ Connection ---------------------------------------------------- /// A handler for a single stream connection between client and server. -pub struct Connection +pub struct Connection where + RequestMeta: Default + Clone + Send + 'static, Buf: BufSource, Buf::Output: Send + Sync + Unpin, - Svc: Service + Clone, + Svc: Service + Clone, { /// Flag used by the Drop impl to track if the metric count has to be /// decreased or not. @@ -270,12 +270,13 @@ where /// Creation /// -impl Connection +impl Connection where + RequestMeta: Default + Clone + Send + 'static, Stream: AsyncRead + AsyncWrite, Buf: BufSource, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone, + Svc: Service + Clone, { /// Creates a new handler for an accepted stream connection. #[must_use] @@ -340,14 +341,13 @@ where /// Control /// -impl Connection +impl Connection where + RequestMeta: Default + Clone + Send + 'static, Stream: AsyncRead + AsyncWrite + Send + Sync + 'static, Buf: BufSource + Send + Sync + Clone + 'static, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Send, - Svc::Stream: Send, + Svc: Service + Clone, { /// Start reading requests and writing responses to the stream. /// @@ -377,15 +377,13 @@ where //--- Internal details -impl Connection +impl Connection where + RequestMeta: Default + Clone + Send + 'static, Stream: AsyncRead + AsyncWrite + Send + Sync + 'static, Buf: BufSource + Send + Sync + Clone + 'static, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Send, - Svc::Future: Send, - Svc::Stream: Send, + Svc: Service + Clone, { /// Connection handler main loop. async fn run_until_error( @@ -685,7 +683,7 @@ where received_at, msg, ctx, - (), + Default::default(), ); let svc = self.service.clone(); @@ -799,11 +797,13 @@ where //--- Drop -impl Drop for Connection +impl Drop + for Connection where + RequestMeta: Default + Clone + Send + 'static, Buf: BufSource, Buf::Output: Send + Sync + Unpin, - Svc: Service + Clone, + Svc: Service + Clone, { fn drop(&mut self) { if self.active { diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index 4da2e261c..f9d896700 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -33,7 +33,6 @@ use tokio::time::Instant; use tokio::time::MissedTickBehavior; use tracing::{error, trace, warn}; -use crate::base::wire::Composer; use crate::base::Message; use crate::net::server::buf::BufSource; use crate::net::server::error::Error; @@ -251,14 +250,7 @@ where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, ::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Clone - + Service<::Output, ()> - + Send - + Sync - + 'static, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Svc: Service<::Output, ()> + Clone, { /// The configuration of the server. config: Arc>, @@ -295,10 +287,7 @@ where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, ::Output: Octets + Send + Sync + Unpin, - Svc: Clone + Service<::Output, ()> + Send + Sync, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Svc: Service<::Output, ()> + Clone, { /// Constructs a new [`DgramServer`] with default configuration. /// @@ -352,10 +341,7 @@ where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, ::Output: Octets + Send + Sync + Unpin, - Svc: Clone + Service<::Output, ()> + Send + Sync, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Svc: Service<::Output, ()> + Clone, { /// Get a reference to the network source being used to receive messages. #[must_use] @@ -377,14 +363,7 @@ where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, ::Output: Octets + Send + Sync + 'static + Unpin, - Svc: Clone - + Service<::Output, ()> - + Send - + Sync - + 'static, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Svc: Service<::Output, ()> + Clone, { /// Start the server. /// @@ -465,10 +444,7 @@ where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, ::Output: Octets + Send + Sync + Unpin, - Svc: Clone + Service<::Output, ()> + Send + Sync, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Svc: Service<::Output, ()> + Clone, { /// Receive incoming messages until shutdown or fatal error. async fn run_until_error(&self) -> Result<(), String> { @@ -678,14 +654,7 @@ where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, ::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Clone - + Service<::Output, ()> - + Send - + Sync - + 'static, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Svc: Service<::Output, ()> + Clone, { fn drop(&mut self) { // Shutdown the DgramServer. Don't handle the failure case here as diff --git a/src/net/server/message.rs b/src/net/server/message.rs index 197ab0718..6e7687daf 100644 --- a/src/net/server/message.rs +++ b/src/net/server/message.rs @@ -166,7 +166,7 @@ impl From for TransportSpecificContext { /// message itself but also on the circumstances surrounding its creation and /// delivery. #[derive(Debug)] -pub struct Request +pub struct Request where Octs: AsRef<[u8]> + Send + Sync, { @@ -191,7 +191,7 @@ where /// still possible to generate responses that ignore this value. num_reserved_bytes: u16, - /// user defined metadata to associate with the request. + /// User defined metadata to associate with the request. /// /// For example this could be used to pass data from one [middleware] /// [`Service`] impl to another. @@ -298,12 +298,12 @@ where //--- TryFrom> for RequestMessage> -impl TryFrom> - for RequestMessage +impl + TryFrom> for RequestMessage { type Error = request::Error; - fn try_from(req: Request) -> Result { + fn try_from(req: Request) -> Result { // Copy the ECS option from the message. This is just an example, // there should be a separate plugin that deals with ECS. diff --git a/src/net/server/middleware/cookies.rs b/src/net/server/middleware/cookies.rs index fd6b089be..4344cdc64 100644 --- a/src/net/server/middleware/cookies.rs +++ b/src/net/server/middleware/cookies.rs @@ -14,7 +14,7 @@ use crate::base::iana::{OptRcode, Rcode}; use crate::base::message_builder::AdditionalBuilder; use crate::base::net::IpAddr; use crate::base::opt; -use crate::base::wire::{Composer, ParseError}; +use crate::base::wire::ParseError; use crate::base::{Serial, StreamTarget}; use crate::net::server::message::Request; use crate::net::server::middleware::stream::MiddlewareStream; @@ -46,7 +46,13 @@ const ONE_HOUR_AS_SECS: u32 = 60 * 60; /// [7873]: https://datatracker.ietf.org/doc/html/rfc7873 /// [9018]: https://datatracker.ietf.org/doc/html/rfc7873 #[derive(Clone, Debug)] -pub struct CookiesMiddlewareSvc { +pub struct CookiesMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Send + Sync + 'static, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -70,6 +76,11 @@ pub struct CookiesMiddlewareSvc { impl CookiesMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Send + Sync + 'static, { /// Creates an instance of this middleware service. #[must_use] @@ -108,10 +119,10 @@ impl impl CookiesMiddlewareSvc where - RequestOctets: Octets + Send + Sync + Unpin, - RequestMeta: Clone + Default, NextSvc: Service, - NextSvc::Target: Composer + Default, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Send + Sync + 'static, { /// Get the DNS COOKIE, if any, for the given message. /// @@ -457,11 +468,10 @@ where impl Service for CookiesMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static + Unpin, - RequestMeta: Clone + Default, NextSvc: Service, - NextSvc::Target: Composer + Default, NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Send + Sync + 'static, { type Target = NextSvc::Target; type Stream = MiddlewareStream< @@ -534,7 +544,7 @@ mod tests { ); fn my_service( - _req: Request>, + _req: Request, ()>, _meta: (), ) -> ServiceResult> { // For each request create a single response: diff --git a/src/net/server/middleware/edns.rs b/src/net/server/middleware/edns.rs index 880a12b8c..75df8dc04 100644 --- a/src/net/server/middleware/edns.rs +++ b/src/net/server/middleware/edns.rs @@ -12,7 +12,6 @@ use crate::base::iana::OptRcode; use crate::base::message_builder::AdditionalBuilder; use crate::base::opt::keepalive::IdleTimeout; use crate::base::opt::{ComposeOptData, Opt, OptRecord, TcpKeepalive}; -use crate::base::wire::Composer; use crate::base::{Message, Name, StreamTarget}; use crate::net::server::message::{Request, TransportSpecificContext}; use crate::net::server::middleware::stream::MiddlewareStream; @@ -47,7 +46,13 @@ const EDNS_VERSION_ZERO: u8 = 0; /// [7828]: https://datatracker.ietf.org/doc/html/rfc7828 /// [9210]: https://datatracker.ietf.org/doc/html/rfc9210 #[derive(Clone, Debug, Default)] -pub struct EdnsMiddlewareSvc { +pub struct EdnsMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Unpin + Send + Sync + 'static, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -63,6 +68,11 @@ pub struct EdnsMiddlewareSvc { impl EdnsMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Unpin + Send + Sync + 'static, { /// Creates an instance of this middleware service. #[must_use] @@ -83,10 +93,10 @@ impl impl EdnsMiddlewareSvc where - RequestOctets: Octets + Send + Sync + Unpin, NextSvc: Service, - NextSvc::Target: Composer + Default, - RequestMeta: Clone + Default, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Unpin + Send + Sync + 'static, { fn preprocess( &self, @@ -411,11 +421,10 @@ where impl Service for EdnsMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static + Unpin, - RequestMeta: Clone + Default + Unpin, NextSvc: Service, - NextSvc::Target: Composer + Default, NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Unpin + Send + Sync + 'static, { type Target = NextSvc::Target; type Stream = MiddlewareStream< @@ -431,7 +440,7 @@ where Once::Item>>, ::Item, >; - type Future = core::future::Ready; + type Future = Ready; fn call( &self, @@ -568,7 +577,7 @@ mod tests { ); fn my_service( - req: Request>, + req: Request, ()>, _meta: (), ) -> ServiceResult> { // For each request create a single response: diff --git a/src/net/server/middleware/mandatory.rs b/src/net/server/middleware/mandatory.rs index 933b21afb..5174090b9 100644 --- a/src/net/server/middleware/mandatory.rs +++ b/src/net/server/middleware/mandatory.rs @@ -11,7 +11,7 @@ use tracing::{debug, error, trace, warn}; use crate::base::iana::{Opcode, OptRcode}; use crate::base::message_builder::{AdditionalBuilder, PushError}; -use crate::base::wire::{Composer, ParseError}; +use crate::base::wire::ParseError; use crate::base::{Message, StreamTarget}; use crate::net::server::message::{Request, TransportSpecificContext}; use crate::net::server::service::{CallResult, Service, ServiceResult}; @@ -41,7 +41,13 @@ pub const MINIMUM_RESPONSE_BYTE_LEN: u16 = 512; /// [2181]: https://datatracker.ietf.org/doc/html/rfc2181 /// [9619]: https://datatracker.ietf.org/doc/html/rfc9619 #[derive(Clone, Debug)] -pub struct MandatoryMiddlewareSvc { +pub struct MandatoryMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestMeta: Clone + Default + 'static + Send + Sync + Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -55,6 +61,11 @@ pub struct MandatoryMiddlewareSvc { impl MandatoryMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestMeta: Clone + Default + 'static + Send + Sync + Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, { /// Creates an instance of this middleware service. /// @@ -84,10 +95,10 @@ impl impl MandatoryMiddlewareSvc where - RequestOctets: Octets + Send + Sync + Unpin, NextSvc: Service, - NextSvc::Target: Composer + Default, - RequestMeta: Clone + Default, + NextSvc::Future: Unpin, + RequestMeta: Clone + Default + 'static + Send + Sync + Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, { /// Truncate the given response message if it is too large. /// @@ -303,11 +314,10 @@ where impl Service for MandatoryMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static + Unpin, NextSvc: Service, NextSvc::Future: Unpin, - NextSvc::Target: Composer + Default, - RequestMeta: Clone + Default + Unpin, + RequestMeta: Clone + Default + 'static + Send + Sync + Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, { type Target = NextSvc::Target; type Stream = MiddlewareStream< @@ -331,6 +341,7 @@ where ) -> Self::Future { match self.preprocess(request.message()) { ControlFlow::Continue(()) => { + let request = request.with_new_metadata(Default::default()); let svc_call_fut = self.next_svc.call(request.clone()); let map = PostprocessingStream::new( svc_call_fut, @@ -341,6 +352,7 @@ where ready(MiddlewareStream::Map(map)) } ControlFlow::Break(mut response) => { + let request = request.with_new_metadata(Default::default()); Self::postprocess(&request, &mut response, self.strict); ready(MiddlewareStream::Result(once(ready(Ok( CallResult::new(response), @@ -457,7 +469,7 @@ mod tests { ); fn my_service( - req: Request>, + req: Request, ()>, _meta: (), ) -> ServiceResult> { // For each request create a single response: diff --git a/src/net/server/middleware/notify.rs b/src/net/server/middleware/notify.rs index 2eef038ef..fa0e465f9 100644 --- a/src/net/server/middleware/notify.rs +++ b/src/net/server/middleware/notify.rs @@ -61,7 +61,6 @@ use crate::base::message::CopyRecordsError; use crate::base::message_builder::AdditionalBuilder; use crate::base::name::Name; use crate::base::net::IpAddr; -use crate::base::wire::Composer; use crate::base::{ Message, ParsedName, Question, Rtype, StreamTarget, ToName, }; @@ -80,7 +79,15 @@ use crate::rdata::AllRecordData; /// /// [RFC 1996]: https://www.rfc-editor.org/info/rfc1996 #[derive(Clone, Debug)] -pub struct NotifyMiddlewareSvc { +pub struct NotifyMiddlewareSvc +where + NextSvc: Service + Unpin + Clone, + NextSvc::Future: Sync + Unpin, + N: Notifiable + Clone + Sync + Send + 'static, + RequestOctets: Octets + Send + Sync + 'static + Clone, + RequestMeta: Clone + Default + Sync + Send + 'static, + for<'a> ::Range<'a>: Send + Sync, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -93,6 +100,13 @@ pub struct NotifyMiddlewareSvc { impl NotifyMiddlewareSvc +where + NextSvc: Service + Unpin + Clone, + NextSvc::Future: Sync + Unpin, + N: Notifiable + Clone + Sync + Send + 'static, + RequestOctets: Octets + Send + Sync + 'static + Clone, + RequestMeta: Clone + Default + Sync + Send + 'static, + for<'a> ::Range<'a>: Send + Sync, { /// Creates an instance of this middleware service. /// @@ -111,11 +125,12 @@ impl impl NotifyMiddlewareSvc where - RequestOctets: Octets + Send + Sync, - RequestMeta: Clone + Default, - NextSvc: Service, - NextSvc::Target: Composer + Default, - N: Clone + Notifiable + Sync + Send, + NextSvc: Service + Unpin + Clone, + NextSvc::Future: Sync + Unpin, + N: Notifiable + Clone + Sync + Send + 'static, + RequestOctets: Octets + Send + Sync + 'static + Clone, + RequestMeta: Clone + Default + Sync + Send + 'static, + for<'a> ::Range<'a>: Send + Sync, { /// Pre-process received DNS NOTIFY queries. /// @@ -326,18 +341,12 @@ impl Service for NotifyMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static, + NextSvc: Service + Unpin + Clone, + NextSvc::Future: Sync + Unpin, + N: Notifiable + Clone + Sync + Send + 'static, + RequestOctets: Octets + Send + Sync + 'static + Clone, RequestMeta: Clone + Default + Sync + Send + 'static, for<'a> ::Range<'a>: Send + Sync, - NextSvc: Service - + Clone - + 'static - + Send - + Sync - + Unpin, - NextSvc::Future: Send + Sync + Unpin, - NextSvc::Target: Composer + Default + Send + Sync, - N: Notifiable + Clone + Sync + Send + 'static, { type Target = NextSvc::Target; type Stream = MiddlewareStream< diff --git a/src/net/server/middleware/tsig.rs b/src/net/server/middleware/tsig.rs index 195fe3484..4edb9add3 100644 --- a/src/net/server/middleware/tsig.rs +++ b/src/net/server/middleware/tsig.rs @@ -66,20 +66,33 @@ use futures_util::Stream; /// Upstream services can detect whether a request is signed and with which /// key by consuming the `Option` metadata output by this service. #[derive(Clone, Debug)] -pub struct TsigMiddlewareSvc +pub struct TsigMiddlewareSvc where + Infallible: From<>>::Error>, KS: Clone + KeyStore, + KS::Key: Clone, + NextSvc: Service>, + NextSvc::Target: Composer + Default, + RequestOctets: Octets + OctetsFrom> + Send + Sync + Unpin, { next_svc: NextSvc, key_store: KS, - _phantom: PhantomData, + _phantom: PhantomData<(RequestOctets, IgnoredRequestMeta)>, } -impl TsigMiddlewareSvc +impl + TsigMiddlewareSvc where - KS: Clone + KeyStore, + IgnoredRequestMeta: Default + Clone + Send + Sync + Unpin + 'static, + Infallible: From<>>::Error>, + KS: Clone + KeyStore + Unpin + Send + Sync + 'static, + KS::Key: Clone + Unpin + Send + Sync, + NextSvc: Service>, + NextSvc::Future: Unpin, + RequestOctets: + Octets + OctetsFrom> + Send + Sync + 'static + Unpin + Clone, { /// Creates an instance of this middleware service. /// @@ -95,18 +108,21 @@ where } } -impl TsigMiddlewareSvc +impl + TsigMiddlewareSvc where - RequestOctets: Octets + OctetsFrom> + Send + Sync + Unpin, - NextSvc: Service>, - NextSvc::Target: Composer + Default, - KS: Clone + KeyStore, - KS::Key: Clone, + IgnoredRequestMeta: Default + Clone + Send + Sync + Unpin + 'static, Infallible: From<>>::Error>, + KS: Clone + KeyStore + Unpin + Send + Sync + 'static, + KS::Key: Clone + Unpin + Send + Sync, + NextSvc: Service>, + NextSvc::Future: Unpin, + RequestOctets: + Octets + OctetsFrom> + Send + Sync + 'static + Unpin + Clone, { #[allow(clippy::type_complexity)] fn preprocess( - req: &Request, + req: &Request, key_store: &KS, ) -> Result< ControlFlow< @@ -188,7 +204,7 @@ where /// Sign the given response, or if necessary construct and return an /// alternate response. fn postprocess( - request: &Request, + request: &Request, response: &mut AdditionalBuilder>, state: &mut PostprocessingState, ) -> Result< @@ -272,7 +288,7 @@ where } fn mk_signed_truncated_response( - request: &Request, + request: &Request, truncation_ctx: TruncationContext, ) -> Result>, ServiceError> { @@ -334,7 +350,7 @@ where } fn map_stream_item( - request: Request, + request: Request, stream_item: ServiceResult, pp_config: &mut PostprocessingState, ) -> ServiceResult { @@ -396,17 +412,18 @@ where /// and (b) because this service does not propagate the metadata it receives /// from downstream but instead outputs [`Option`] metadata to /// upstream services. -impl Service - for TsigMiddlewareSvc +impl + Service + for TsigMiddlewareSvc where - RequestOctets: - Octets + OctetsFrom> + Send + Sync + 'static + Unpin, + IgnoredRequestMeta: Default + Clone + Send + Sync + Unpin + 'static, + Infallible: From<>>::Error>, + KS: Clone + KeyStore + Unpin + Send + Sync + 'static, + KS::Key: Clone + Unpin + Send + Sync, NextSvc: Service>, NextSvc::Future: Unpin, - NextSvc::Target: Composer + Default, - KS: Clone + KeyStore + Unpin, - KS::Key: Clone + Unpin, - Infallible: From<>>::Error>, + RequestOctets: + Octets + OctetsFrom> + Send + Sync + 'static + Unpin + Clone, { type Target = NextSvc::Target; type Stream = MiddlewareStream< @@ -416,7 +433,7 @@ where RequestOctets, NextSvc::Future, NextSvc::Stream, - (), + IgnoredRequestMeta, PostprocessingState, >, Once>>, @@ -424,7 +441,10 @@ where >; type Future = Ready; - fn call(&self, request: Request) -> Self::Future { + fn call( + &self, + request: Request, + ) -> Self::Future { match Self::preprocess(&request, &self.key_store) { Ok(ControlFlow::Continue(Some((modified_req, signer)))) => { let pp_config = PostprocessingState::new(signer); diff --git a/src/net/server/middleware/xfr/data_provider.rs b/src/net/server/middleware/xfr/data_provider.rs index 65aae2619..a274da5d8 100644 --- a/src/net/server/middleware/xfr/data_provider.rs +++ b/src/net/server/middleware/xfr/data_provider.rs @@ -85,7 +85,7 @@ impl XfrData { //------------ XfrDataProvider ------------------------------------------------ /// A provider of data needed for responding to XFR requests. -pub trait XfrDataProvider { +pub trait XfrDataProvider { type Diff: ZoneDiff + Send + Sync; /// Request data needed to respond to an XFR request. diff --git a/src/net/server/middleware/xfr/service.rs b/src/net/server/middleware/xfr/service.rs index d0862f5e7..8f330eaba 100644 --- a/src/net/server/middleware/xfr/service.rs +++ b/src/net/server/middleware/xfr/service.rs @@ -16,7 +16,6 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, info, trace, warn}; use crate::base::iana::{Opcode, OptRcode}; -use crate::base::wire::Composer; use crate::base::{Message, ParsedName, Question, Rtype, Serial, ToName}; use crate::net::server::message::{Request, TransportSpecificContext}; use crate::net::server::middleware::stream::MiddlewareStream; @@ -55,7 +54,18 @@ const MAX_TCP_MSG_BYTE_LEN: u16 = u16::MAX; /// /// [module documentation]: crate::net::server::middleware::xfr #[derive(Clone, Debug)] -pub struct XfrMiddlewareSvc { +pub struct XfrMiddlewareSvc +where + RequestOctets: Octets + Send + Sync + Unpin + 'static + Clone, + for<'a> ::Range<'a>: Send + Sync, + NextSvc: + Service + Clone + Send + Sync + 'static, + NextSvc::Future: Sync + Unpin, + NextSvc::Stream: Sync, + XDP: XfrDataProvider + Clone + Sync + Send + 'static, + XDP::Diff: Debug + Sync, + RequestMeta: Clone + Default + Sync + Send + 'static, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -78,7 +88,15 @@ pub struct XfrMiddlewareSvc { impl XfrMiddlewareSvc where - XDP: XfrDataProvider, + RequestOctets: Octets + Send + Sync + Unpin + 'static + Clone, + for<'a> ::Range<'a>: Send + Sync, + NextSvc: + Service + Clone + Send + Sync + 'static, + NextSvc::Future: Sync + Unpin, + NextSvc::Stream: Sync, + XDP: XfrDataProvider + Clone + Sync + Send + 'static, + XDP::Diff: Debug + Sync, + RequestMeta: Clone + Default + Sync + Send + 'static, { /// Creates a new instance of this middleware. /// @@ -110,14 +128,15 @@ where impl XfrMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static + Unpin, + RequestOctets: Octets + Send + Sync + Unpin + 'static + Clone, for<'a> ::Range<'a>: Send + Sync, - NextSvc: Service + Clone + Send + Sync + 'static, - NextSvc::Future: Send + Sync + Unpin, - NextSvc::Target: Composer + Default + Send + Sync, - NextSvc::Stream: Send + Sync, - XDP: XfrDataProvider, - XDP::Diff: Debug + 'static, + NextSvc: + Service + Clone + Send + Sync + 'static, + NextSvc::Future: Sync + Unpin, + NextSvc::Stream: Sync, + XDP: XfrDataProvider + Clone + Sync + Send + 'static, + XDP::Diff: Debug + Sync, + RequestMeta: Clone + Default + Sync + Send + 'static, { /// Pre-process received DNS XFR queries. /// @@ -684,12 +703,12 @@ impl Service for XfrMiddlewareSvc where - RequestOctets: Octets + Send + Sync + Unpin + 'static, + RequestOctets: Octets + Send + Sync + Unpin + 'static + Clone, for<'a> ::Range<'a>: Send + Sync, - NextSvc: Service + Clone + Send + Sync + 'static, - NextSvc::Future: Send + Sync + Unpin, - NextSvc::Target: Composer + Default + Send + Sync, - NextSvc::Stream: Send + Sync, + NextSvc: + Service + Clone + Send + Sync + 'static, + NextSvc::Future: Sync + Unpin, + NextSvc::Stream: Sync, XDP: XfrDataProvider + Clone + Sync + Send + 'static, XDP::Diff: Debug + Sync, RequestMeta: Clone + Default + Sync + Send + 'static, @@ -721,7 +740,8 @@ where .await { Ok(ControlFlow::Continue(())) => { - let request = request.with_new_metadata(()); + let request = + request.with_new_metadata(Default::default()); let stream = next_svc.call(request).await; MiddlewareStream::IdentityStream(stream) } diff --git a/src/net/server/middleware/xfr/tests.rs b/src/net/server/middleware/xfr/tests.rs index d4849a25b..ddad7e7b3 100644 --- a/src/net/server/middleware/xfr/tests.rs +++ b/src/net/server/middleware/xfr/tests.rs @@ -57,7 +57,7 @@ async fn axfr_with_example_zone() { "../../../../../test-data/zonefiles/nsd-example.txt" )); - let req = mk_axfr_request(zone.apex_name(), ()); + let req = mk_axfr_request(zone.apex_name(), Default::default()); let res = do_preprocess(zone.clone(), &req).await.unwrap(); @@ -125,7 +125,7 @@ async fn axfr_multi_response() { "../../../../../test-data/zonefiles/big.example.com.txt" )); - let req = mk_axfr_request(zone.apex_name(), ()); + let req = mk_axfr_request(zone.apex_name(), Default::default()); let res = do_preprocess(zone.clone(), &req).await.unwrap(); @@ -203,7 +203,7 @@ async fn axfr_not_allowed_over_udp() { "../../../../../test-data/zonefiles/nsd-example.txt" )); - let req = mk_udp_axfr_request(zone.apex_name(), ()); + let req = mk_udp_axfr_request(zone.apex_name(), Default::default()); let res = do_preprocess(zone, &req).await.unwrap(); @@ -245,7 +245,8 @@ JAIN-BB.JAIN.AD.JP. IN A 192.41.197.2 let zone_with_diffs = ZoneWithDiffs::new(zone.clone(), vec![]); // The following IXFR query - let req = mk_udp_ixfr_request(zone.apex_name(), Serial(1), ()); + let req = + mk_udp_ixfr_request(zone.apex_name(), Serial(1), Default::default()); let res = do_preprocess(zone_with_diffs, &req).await.unwrap(); @@ -383,7 +384,8 @@ JAIN-BB.JAIN.AD.JP. IN A 192.41.197.2 let zone_with_diffs = ZoneWithDiffs::new(zone.clone(), diffs); // The following IXFR query - let req = mk_ixfr_request(zone.apex_name(), Serial(1), ()); + let req = + mk_ixfr_request(zone.apex_name(), Serial(1), Default::default()); let res = do_preprocess(zone_with_diffs, &req).await.unwrap(); @@ -480,7 +482,8 @@ async fn ixfr_rfc1995_section7_udp_packet_overflow() { "../../../../../test-data/zonefiles/big.example.com.txt" )); - let req = mk_udp_ixfr_request(zone.apex_name(), Serial(0), ()); + let req = + mk_udp_ixfr_request(zone.apex_name(), Serial(0), Default::default()); let res = do_preprocess(zone.clone(), &req).await.unwrap(); @@ -506,6 +509,7 @@ async fn axfr_with_tsig_key() { // type over which the Request produced by TsigMiddlewareSvc is generic. // When the XfrMiddlewareSvc receives a Request it // passes it to the XfrDataProvider which in turn can inspect it. + #[derive(Clone)] struct KeyReceivingXfrDataProvider { key: Arc, checked: Arc, @@ -685,23 +689,24 @@ fn mk_ixfr_request_for_transport( Request::new(client_addr, received_at, msg, transport_specific, metadata) } -async fn do_preprocess>( +async fn do_preprocess( zone: XDP, - req: &Request, RequestMeta>, + req: &Request, Option>>, ) -> Result< ControlFlow< XfrMiddlewareStream< - ::Future, - ::Stream, - <::Stream as Stream>::Item, + , Option>>>::Future, + , Option>>>::Stream, + <, Option>>>::Stream as Stream>::Item, >, >, OptRcode, > where + XDP: XfrDataProvider>> + Clone + Sync + Send + 'static, XDP::Diff: Debug + 'static, { - XfrMiddlewareSvc::, TestNextSvc, RequestMeta, XDP>::preprocess( + XfrMiddlewareSvc::, TestNextSvc, Option>, XDP>::preprocess( Arc::new(Semaphore::new(1)), Arc::new(Semaphore::new(1)), req, @@ -771,16 +776,20 @@ async fn assert_stream_eq< #[derive(Clone)] struct TestNextSvc; -impl Service, ()> for TestNextSvc { +impl Service, Option>> for TestNextSvc { type Target = Vec; type Stream = Once>>; type Future = Ready; - fn call(&self, _request: Request, ()>) -> Self::Future { + fn call( + &self, + _request: Request, Option>>, + ) -> Self::Future { todo!() } } +#[derive(Clone)] struct ZoneWithDiffs { zone: Zone, diffs: Vec>, @@ -806,11 +815,11 @@ impl ZoneWithDiffs { } } -impl XfrDataProvider for ZoneWithDiffs { +impl XfrDataProvider for ZoneWithDiffs { type Diff = Arc; fn request( &self, - req: &Request, + req: &Request, diff_from: Option, ) -> Pin< Box< diff --git a/src/net/server/qname_router.rs b/src/net/server/qname_router.rs index 749b8f7a5..46c2660e2 100644 --- a/src/net/server/qname_router.rs +++ b/src/net/server/qname_router.rs @@ -22,21 +22,24 @@ use tracing::trace; /// A service that routes requests to other services based on the Qname in the /// request. -pub struct QnameRouter { +pub struct QnameRouter { /// List of names and services for routing requests. - list: Vec>, + list: Vec>, } /// Element in the name space for the Qname router. -struct Element { +struct Element { /// Name to match for this element. name: Name, /// Service to call for this element. - service: Box + Send + Sync>, + service: + Box + Send + Sync>, } -impl QnameRouter { +impl + QnameRouter +{ /// Create a new empty router. pub fn new() -> Self { Self { list: Vec::new() } @@ -50,7 +53,10 @@ impl QnameRouter { EmptyBuilder + OctetsBuilder, TN: ToName, RequestOcts: Send + Sync, - SVC: SingleService + Send + Sync + 'static, + SVC: SingleService + + Send + + Sync + + 'static, { let el = Element { name: name.to_name(), @@ -60,22 +66,26 @@ impl QnameRouter { } } -impl Default for QnameRouter { +impl Default + for QnameRouter +{ fn default() -> Self { Self::new() } } -impl SingleService - for QnameRouter +impl + SingleService + for QnameRouter where Octs: AsRef<[u8]>, + RequestMeta: Clone, RequestOcts: Send + Sync, CR: ComposeReply + Send + Sync + 'static, { fn call( &self, - request: Request, + request: Request, ) -> Pin> + Send + Sync>> where RequestOcts: AsRef<[u8]> + Octets, diff --git a/src/net/server/service.rs b/src/net/server/service.rs index 13e177bd9..455cd05b1 100644 --- a/src/net/server/service.rs +++ b/src/net/server/service.rs @@ -7,11 +7,10 @@ use core::fmt::Display; use core::ops::Deref; use std::time::Duration; -use std::vec::Vec; use crate::base::iana::Rcode; use crate::base::message_builder::{AdditionalBuilder, PushError}; -use crate::base::wire::ParseError; +use crate::base::wire::{Composer, ParseError}; use crate::base::StreamTarget; use super::message::Request; @@ -167,19 +166,20 @@ pub type ServiceResult = Result, ServiceError>; /// [`call`]: Self::call() /// [`service_fn`]: crate::net::server::util::service_fn() pub trait Service< - RequestOctets: AsRef<[u8]> + Send + Sync = Vec, - RequestMeta: Clone + Default = (), -> + RequestOctets: AsRef<[u8]> + Send + Sync, + RequestMeta: Clone + Default, +>: Send + Sync + 'static { /// The underlying byte storage type used to hold generated responses. - type Target; + type Target: Composer + Default + Send + Sync; /// The type of stream that the service produces. type Stream: futures_util::stream::Stream> - + Unpin; + + Unpin + + Send; /// The type of future that will yield the service result stream. - type Future: core::future::Future; + type Future: core::future::Future + Send; /// Generate a response to a fully pre-processed request. fn call( @@ -195,8 +195,8 @@ impl Service for U where RequestOctets: Unpin + Send + Sync + AsRef<[u8]>, - T: ?Sized + Service, - U: Deref + Clone, + T: Service, + U: Deref + Clone + Send + Sync + 'static, RequestMeta: Clone + Default, { type Target = T::Target; diff --git a/src/net/server/single_service.rs b/src/net/server/single_service.rs index 28c6d19fe..73035635c 100644 --- a/src/net/server/single_service.rs +++ b/src/net/server/single_service.rs @@ -22,13 +22,13 @@ use std::pin::Pin; use std::vec::Vec; /// Trait for a service that results in a single response. -pub trait SingleService { +pub trait SingleService { /// Call the service with a request message. /// /// The service returns a boxed future. fn call( &self, - request: Request, + request: Request, ) -> Pin> + Send + Sync>> where RequestOcts: AsRef<[u8]> + Octets; diff --git a/src/net/server/stream.rs b/src/net/server/stream.rs index c22b39d36..617086c85 100644 --- a/src/net/server/stream.rs +++ b/src/net/server/stream.rs @@ -39,7 +39,6 @@ use crate::utils::config::DefMinMax; use super::buf::VecBufSource; use super::connection::{self, Connection}; use super::ServerCommand; -use crate::base::wire::Composer; use tokio::io::{AsyncRead, AsyncWrite}; // TODO: Should this crate also provide a TLS listener implementation? @@ -270,8 +269,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, // + 'static, + Svc: Service + Clone, { /// The configuration of the server. config: Arc>, @@ -315,8 +313,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { /// Creates a new [`StreamServer`] instance. /// @@ -395,8 +392,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Debug + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { /// Get a reference to the source for this server. #[must_use] @@ -418,8 +414,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { /// Start the server. /// @@ -435,10 +430,6 @@ where Listener::Error: Send, Listener::Future: Send + 'static, Listener::StreamType: AsyncRead + AsyncWrite + Send + Sync + 'static, - Svc: 'static, - Svc::Target: Send + Sync, - Svc::Stream: Send, - Svc::Future: Send, { if let Err(err) = self.run_until_error().await { error!("Server stopped due to error: {err}"); @@ -513,8 +504,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { /// Accept stream connections until shutdown or fatal error. async fn run_until_error(&self) -> Result<(), String> @@ -524,10 +514,6 @@ where Listener::Error: Send, Listener::Future: Send + 'static, Listener::StreamType: AsyncRead + AsyncWrite + Send + Sync + 'static, - Svc: 'static, - Svc::Target: Send + Sync, - Svc::Stream: Send, - Svc::Future: Send, { let mut command_rx = self.command_rx.clone(); @@ -646,10 +632,6 @@ where Listener::Error: Send, Listener::Future: Send + 'static, Listener::StreamType: AsyncRead + AsyncWrite + Send + Sync + 'static, - Svc: 'static, - Svc::Target: Composer + Send + Sync, - Svc::Stream: Send, - Svc::Future: Send, { // Work around the compiler wanting to move self to the async block by // preparing only those pieces of information from self for the new @@ -713,8 +695,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { fn drop(&mut self) { // Shutdown the StreamServer. Don't handle the failure case here as diff --git a/src/net/server/tests/integration.rs b/src/net/server/tests/integration.rs index bd02f52bc..178517a70 100644 --- a/src/net/server/tests/integration.rs +++ b/src/net/server/tests/integration.rs @@ -22,7 +22,6 @@ use tracing::{trace, warn}; use crate::base::iana::{Class, Rcode}; use crate::base::name::ToName; use crate::base::net::IpAddr; -use crate::base::wire::Composer; use crate::base::Name; use crate::base::Rtype; use crate::net::client::request::{RequestMessage, RequestMessageMulti}; @@ -233,17 +232,14 @@ fn mk_servers( Arc>, ) where - Svc: Clone + Service + Send + Sync, - ::Future: Send, - ::Target: Composer + Default + Send + Sync, - ::Stream: Send, + Svc: Service, ()> + Clone, { // Prepare middleware to be used by the DNS servers to pre-process // received requests and post-process created responses. let (dgram_config, stream_config) = mk_server_configs(server_config); // Create a dgram server for handling UDP requests. - let dgram_server = DgramServer::<_, _, Svc>::with_config( + let dgram_server = DgramServer::with_config( dgram_server_conn.clone(), VecBufSource, service.clone(), diff --git a/src/net/server/tests/unit.rs b/src/net/server/tests/unit.rs index 834799611..89139439b 100644 --- a/src/net/server/tests/unit.rs +++ b/src/net/server/tests/unit.rs @@ -329,6 +329,7 @@ impl futures_util::stream::Stream for MySingle { /// A mock service that returns MySingle whenever it receives a message. /// Just to show MySingle in action. +#[derive(Clone)] struct MyService; impl MyService { @@ -337,12 +338,15 @@ impl MyService { } } -impl Service> for MyService { +impl Service, ()> for MyService +where + Self: Clone + Send + Sync + 'static, +{ type Target = Vec; type Stream = MySingle; type Future = Ready; - fn call(&self, request: Request>) -> Self::Future { + fn call(&self, request: Request, ()>) -> Self::Future { trace!("Processing request id {}", request.message().header().id()); ready(MySingle::new()) } diff --git a/src/net/server/util.rs b/src/net/server/util.rs index 220b6171f..2b30d5872 100644 --- a/src/net/server/util.rs +++ b/src/net/server/util.rs @@ -134,12 +134,13 @@ where RequestOctets: AsRef<[u8]> + Send + Sync + Unpin, RequestMeta: Default + Clone, Metadata: Clone, - Target: Composer + Default, + Target: Composer + Default + Send + Sync, T: Fn( Request, Metadata, ) -> ServiceResult + Clone, + Self: Clone + Send + Sync + 'static, { type Target = Target; type Stream = Once>>;