Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement middleware types to allow intercepting client & request handling #232

Merged
merged 5 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Internalized the `BuildError` type, consolidating on the `Error` type ([#228](https://github.com/opensearch-project/opensearch-rs/pull/228))

### Added
- Added middleware types to allow intercepting construction and handling of the underlying `reqwest` client & requests ([#232](https://github.com/opensearch-project/opensearch-rs/pull/232))

### Dependencies
- Bumps `aws-*` dependencies to `1` ([#219](https://github.com/opensearch-project/opensearch-rs/pull/219))
Expand Down
1 change: 1 addition & 0 deletions opensearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rustls-tls = ["reqwest/rustls-tls"]
aws-auth = ["aws-credential-types", "aws-sigv4", "aws-smithy-runtime-api", "aws-types"]

[dependencies]
async-trait = "0.1"
base64 = "0.21"
bytes = "1.0"
dyn-clone = "1"
Expand Down
6 changes: 6 additions & 0 deletions opensearch/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,9 @@ impl OpenSearch {
.await
}
}

// Ensure that the client is `Send` and `Sync`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is certainly useful but shouldn't be moved to tests?

const _: () = {
const fn is_send_sync<T: Send + Sync>() {}
is_send_sync::<OpenSearch>()
};
28 changes: 26 additions & 2 deletions opensearch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@

use crate::{
cert::CertificateError,
http::{transport, StatusCode},
http::{
middleware::{RequestPipelineError, RequestPipelineErrorKind},
transport, StatusCode,
},
};

pub(crate) type BoxError<'a> = Box<dyn std::error::Error + Send + Sync + 'a>;
Expand All @@ -53,7 +56,7 @@ where
Kind: From<E>,
{
fn from(error: E) -> Self {
Self(Kind::from(error))
Self(error.into())
}
}

Expand All @@ -80,11 +83,32 @@ enum Kind {
#[cfg(feature = "aws-auth")]
#[error("AwsSigV4 error: {0}")]
AwsSigV4(#[from] crate::http::aws_auth::AwsSigV4Error),

#[error("request initializer error: {0}")]
RequestInitializer(#[source] BoxError<'static>),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could get right of boxing (and this BoxError) type altogether and just use the reference &'static (dyn std::error::Error + Sync + Send):

    #[error("request initializer error: {0}")]
    RequestInitializer(#[source] &'static (dyn std::error::Error + Sync + Send)),

    #[error("request pipeline error: {0}")]
    RequestPipeline(#[source] &'static (dyn std::error::Error + Sync + Send)),

It is not necessarily a better option (in my opinion) but it is a bit simpler from "less abstractions" standpoint, curious what do you think about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we took a &'static reference it would mean any errors returned would need to be defined as consts or leaked memory. The + 'static bound here just means the contained data must be owned (or contain only &'static references), such that we can hold onto the type for an indeterminate amount of time. https://doc.rust-lang.org/rust-by-example/scope/lifetime/static_lifetime.html

A Box is the best way to take ownership of the arbitrary error type and "erase" it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Box is the best way to take ownership of the arbitrary error type and "erase" it.

That is a strong argument, thank you. I have no other comments left


#[error("request pipeline error: {0}")]
RequestPipeline(#[source] BoxError<'static>),
}

impl From<RequestPipelineError> for Kind {
fn from(err: RequestPipelineError) -> Self {
use RequestPipelineErrorKind::*;

match err.0 {
Pipeline(err) => Self::RequestPipeline(err),
Http(err) => Self::Http(err),
}
}
}

use Kind::*;

impl Error {
pub(crate) fn request_initializer(err: BoxError<'static>) -> Self {
Self(RequestInitializer(err))
}

/// The status code, if the error was generated from a response
pub fn status_code(&self) -> Option<StatusCode> {
match &self.0 {
Expand Down
47 changes: 47 additions & 0 deletions opensearch/src/http/middleware/initializers/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

use super::InitializerResult;
use crate::BoxError;
use reqwest::ClientBuilder;

pub trait ClientInitializer: 'static {
type Result: InitializerResult<ClientBuilder>;

fn init(self, client: ClientBuilder) -> Self::Result;
}

impl<F, R> ClientInitializer for F
where
F: FnOnce(ClientBuilder) -> R + 'static,
R: InitializerResult<ClientBuilder>,
{
type Result = R;

fn init(self, client: ClientBuilder) -> Self::Result {
self(client)
}
}

pub(crate) trait BoxedClientInitializer {
fn init(self: Box<Self>, client: ClientBuilder) -> Result<ClientBuilder, BoxError<'static>>;
}

impl<T> BoxedClientInitializer for T
where
T: ClientInitializer + Sized,
{
fn init(self: Box<Self>, client: ClientBuilder) -> Result<ClientBuilder, BoxError<'static>> {
ClientInitializer::init(*self, client)
.into_result()
.map_err(Into::into)
}
}
44 changes: 44 additions & 0 deletions opensearch/src/http/middleware/initializers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

mod client;
mod request;

use crate::BoxError;
use std::convert::Infallible;

pub use client::*;
pub use request::*;

pub trait InitializerResult<T> {
type Error: Into<BoxError<'static>>;

fn into_result(self) -> Result<T, Self::Error>;
}

impl<T, E> InitializerResult<T> for Result<T, E>
where
E: Into<BoxError<'static>>,
{
type Error = E;

fn into_result(self) -> Result<T, Self::Error> {
self
}
}

impl<T> InitializerResult<T> for T {
type Error = Infallible;

fn into_result(self) -> Result<T, Infallible> {
Ok(self)
}
}
86 changes: 86 additions & 0 deletions opensearch/src/http/middleware/initializers/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

use super::InitializerResult;
use crate::BoxError;
use reqwest::RequestBuilder;

pub trait RequestInitializer: std::fmt::Debug + Send + Sync + 'static {
type Result: InitializerResult<RequestBuilder>;

fn init(&self, request: RequestBuilder) -> Self::Result;
}

#[derive(Clone)]
pub struct RequestInitializerFn<F>(F);

pub fn request_initializer_fn<F>(f: F) -> RequestInitializerFn<F> {
RequestInitializerFn(f)
}

impl<F> std::fmt::Debug for RequestInitializerFn<F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(RequestInitializerFn)).finish()
}
}

impl<F, R> RequestInitializer for RequestInitializerFn<F>
where
F: Fn(RequestBuilder) -> R + Send + Sync + 'static,
R: InitializerResult<RequestBuilder>,
{
type Result = R;

fn init(&self, request: RequestBuilder) -> Self::Result {
self.0(request)
}
}

impl<R> RequestInitializer for std::sync::Arc<R>
where
R: RequestInitializer,
{
type Result = R::Result;

fn init(&self, request: RequestBuilder) -> Self::Result {
self.as_ref().init(request)
}
}

impl<R> RequestInitializer for std::sync::Arc<dyn RequestInitializer<Result = R>>
where
R: InitializerResult<RequestBuilder> + 'static,
{
type Result = R;

fn init(&self, request: RequestBuilder) -> Self::Result {
self.as_ref().init(request)
}
}

pub(crate) trait BoxedRequestInitializer:
dyn_clone::DynClone + std::fmt::Debug + Send + Sync + 'static
{
fn init(&self, request: RequestBuilder) -> Result<RequestBuilder, BoxError<'static>>;
}

impl<T> BoxedRequestInitializer for T
where
T: RequestInitializer + Clone,
{
fn init(&self, request: RequestBuilder) -> Result<RequestBuilder, BoxError<'static>> {
RequestInitializer::init(self, request)
.into_result()
.map_err(Into::into)
}
}

dyn_clone::clone_trait_object!(BoxedRequestInitializer);
20 changes: 20 additions & 0 deletions opensearch/src/http/middleware/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

mod initializers;
mod request_pipeline;

pub use async_trait::async_trait;
pub use initializers::*;
pub use request_pipeline::*;

pub(crate) type BoxFuture<'a, T> =
std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
Loading
Loading