Skip to content

Commit

Permalink
Merge pull request #10 from Fabcien/custom_codec
Browse files Browse the repository at this point in the history
Allow for using custom json codecs in server and client
  • Loading branch information
hozan23 authored Dec 4, 2024
2 parents f9036b9 + dfdfed5 commit 9621f47
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 34 deletions.
85 changes: 79 additions & 6 deletions jsonrpc/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use karyon_net::ToEndpoint;
#[cfg(feature = "tls")]
use karyon_net::async_rustls::rustls;

use crate::codec::{ClonableJsonCodec, JsonCodec};
use crate::Result;
#[cfg(feature = "tcp")]
use crate::{Error, TcpConfig};
Expand All @@ -17,7 +18,7 @@ const DEFAULT_TIMEOUT: u64 = 3000; // 3s

const DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE: usize = 20000;

impl Client {
impl Client<JsonCodec> {
/// Creates a new [`ClientBuilder`]
///
/// This function initializes a `ClientBuilder` with the specified endpoint.
Expand All @@ -34,11 +35,80 @@ impl Client {
/// .expect("Build a new client");
/// };
/// ```
pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder> {
pub fn builder(endpoint: impl ToEndpoint) -> Result<ClientBuilder<JsonCodec>> {
Client::<JsonCodec>::builder_with_json_codec(endpoint, JsonCodec {})
}
}

impl<C> Client<C>
where
C: ClonableJsonCodec,
{
/// Creates a new [`ClientBuilder`]
///
/// This function initializes a `ClientBuilder` with the specified endpoint
/// and the given json codec.
/// # Example
///
/// ```
/// use karyon_jsonrpc::Client;
/// use karyon_net::{codec::{Codec, Decoder, Encoder}, Error, Result};
///
/// use serde_json::Value;
///
/// #[derive(Clone)]
/// pub struct CustomJsonCodec {}
///
/// impl Codec for CustomJsonCodec {
/// type Item = serde_json::Value;
/// }
///
/// impl Encoder for CustomJsonCodec {
/// type EnItem = serde_json::Value;
/// fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> {
/// let msg = match serde_json::to_string(src) {
/// Ok(m) => m,
/// Err(err) => return Err(Error::Encode(err.to_string())),
/// };
/// let buf = msg.as_bytes();
/// dst[..buf.len()].copy_from_slice(buf);
/// Ok(buf.len())
/// }
/// }
///
/// impl Decoder for CustomJsonCodec {
/// type DeItem = serde_json::Value;
/// fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> {
/// let de = serde_json::Deserializer::from_slice(src);
/// let mut iter = de.into_iter::<serde_json::Value>();
///
/// let item = match iter.next() {
/// Some(Ok(item)) => item,
/// Some(Err(ref e)) if e.is_eof() => return Ok(None),
/// Some(Err(e)) => return Err(Error::Decode(e.to_string())),
/// None => return Ok(None),
/// };
///
/// Ok(Some((iter.byte_offset(), item)))
/// }
/// }
///
/// async {
/// let builder = Client::builder_with_json_codec("tcp://127.0.0.1:3000", CustomJsonCodec {})
/// .expect("Create a new client builder with a custom json codec");
/// let client = builder.build().await
/// .expect("Build a new client");
/// };
/// ```
pub fn builder_with_json_codec(
endpoint: impl ToEndpoint,
json_codec: C,
) -> Result<ClientBuilder<C>> {
let endpoint = endpoint.to_endpoint()?;
Ok(ClientBuilder {
inner: ClientConfig {
endpoint,
json_codec,
timeout: Some(DEFAULT_TIMEOUT),
#[cfg(feature = "tcp")]
tcp_config: Default::default(),
Expand All @@ -51,11 +121,14 @@ impl Client {
}

/// Builder for constructing an RPC [`Client`].
pub struct ClientBuilder {
inner: ClientConfig,
pub struct ClientBuilder<C> {
inner: ClientConfig<C>,
}

impl ClientBuilder {
impl<C> ClientBuilder<C>
where
C: ClonableJsonCodec,
{
/// Set timeout for receiving messages, in milliseconds. Requests will
/// fail if it takes longer.
///
Expand Down Expand Up @@ -191,7 +264,7 @@ impl ClientBuilder {
/// };
///
/// ```
pub async fn build(self) -> Result<Arc<Client>> {
pub async fn build(self) -> Result<Arc<Client<C>>> {
let client = Client::init(self.inner).await?;
Ok(client)
}
Expand Down
23 changes: 14 additions & 9 deletions jsonrpc/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use karyon_core::{
util::random_32,
};

use crate::codec::ClonableJsonCodec;
#[cfg(feature = "ws")]
use crate::codec::WsJsonCodec;

use crate::{
codec::JsonCodec,
message::{self, SubscriptionID},
Error, Result,
};
Expand All @@ -45,8 +45,9 @@ use subscriptions::Subscriptions;

type RequestID = u32;

struct ClientConfig {
struct ClientConfig<C> {
endpoint: Endpoint,
json_codec: C,
#[cfg(feature = "tcp")]
tcp_config: TcpConfig,
#[cfg(feature = "tls")]
Expand All @@ -56,13 +57,13 @@ struct ClientConfig {
}

/// Represents an RPC client
pub struct Client {
pub struct Client<C> {
disconnect: AtomicBool,
message_dispatcher: MessageDispatcher,
subscriptions: Arc<Subscriptions>,
send_chan: (Sender<serde_json::Value>, Receiver<serde_json::Value>),
task_group: TaskGroup,
config: ClientConfig,
config: ClientConfig<C>,
}

#[derive(Serialize, Deserialize)]
Expand All @@ -72,7 +73,10 @@ enum NewMsg {
Response(message::Response),
}

impl Client {
impl<C> Client<C>
where
C: ClonableJsonCodec,
{
/// Calls the provided method, waits for the response, and returns the result.
pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>(
&self,
Expand Down Expand Up @@ -181,7 +185,7 @@ impl Client {
}

/// Initializes a new [`Client`] from the provided [`ClientConfig`].
async fn init(config: ClientConfig) -> Result<Arc<Self>> {
async fn init(config: ClientConfig<C>) -> Result<Arc<Self>> {
let client = Arc::new(Client {
disconnect: AtomicBool::new(false),
subscriptions: Subscriptions::new(config.subscription_buffer_size),
Expand All @@ -202,10 +206,11 @@ impl Client {

async fn connect(self: &Arc<Self>) -> Result<Conn<serde_json::Value>> {
let endpoint = self.config.endpoint.clone();
let json_codec = self.config.json_codec.clone();
let conn: Conn<serde_json::Value> = match endpoint {
#[cfg(feature = "tcp")]
Endpoint::Tcp(..) => Box::new(
karyon_net::tcp::dial(&endpoint, self.config.tcp_config.clone(), JsonCodec {})
karyon_net::tcp::dial(&endpoint, self.config.tcp_config.clone(), json_codec)
.await?,
),
#[cfg(feature = "tls")]
Expand All @@ -218,7 +223,7 @@ impl Client {
client_config: conf.clone(),
tcp_config: self.config.tcp_config.clone(),
},
JsonCodec {},
json_codec,
)
.await?,
),
Expand Down Expand Up @@ -252,7 +257,7 @@ impl Client {
},
#[cfg(all(feature = "unix", target_family = "unix"))]
Endpoint::Unix(..) => {
Box::new(karyon_net::unix::dial(&endpoint, Default::default(), JsonCodec {}).await?)
Box::new(karyon_net::unix::dial(&endpoint, Default::default(), json_codec).await?)
}
_ => return Err(Error::UnsupportedProtocol(endpoint.to_string())),
};
Expand Down
11 changes: 11 additions & 0 deletions jsonrpc/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ use karyon_net::{
#[cfg(feature = "ws")]
use karyon_net::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};

pub trait ClonableJsonCodec:
Codec<Item = serde_json::Value, DeItem = serde_json::Value, EnItem = serde_json::Value> + Clone
{
}
impl<
T: Codec<Item = serde_json::Value, DeItem = serde_json::Value, EnItem = serde_json::Value>
+ Clone,
> ClonableJsonCodec for T
{
}

#[derive(Clone)]
pub struct JsonCodec {}

Expand Down
89 changes: 80 additions & 9 deletions jsonrpc/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@ use karyon_net::ToEndpoint;
#[cfg(feature = "tls")]
use karyon_net::async_rustls::rustls;

use crate::codec::{ClonableJsonCodec, JsonCodec};
#[cfg(feature = "tcp")]
use crate::{Error, TcpConfig};
use crate::{PubSubRPCService, RPCService, Result};

use super::{Server, ServerConfig};

/// Builder for constructing an RPC [`Server`].
pub struct ServerBuilder {
inner: ServerConfig,
pub struct ServerBuilder<C> {
inner: ServerConfig<C>,
}

impl ServerBuilder {
impl<C> ServerBuilder<C>
where
C: ClonableJsonCodec,
{
/// Adds a new RPC service to the server.
///
/// # Example
Expand Down Expand Up @@ -136,7 +140,7 @@ impl ServerBuilder {
///
/// This function will return an error if the endpoint does not support TCP protocols.
#[cfg(feature = "tcp")]
pub fn tcp_config(mut self, config: TcpConfig) -> Result<ServerBuilder> {
pub fn tcp_config(mut self, config: TcpConfig) -> Result<ServerBuilder<C>> {
match self.inner.endpoint {
Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
self.inner.tcp_config = config;
Expand Down Expand Up @@ -167,7 +171,7 @@ impl ServerBuilder {
///
/// This function will return an error if the endpoint does not support TLS protocols.
#[cfg(feature = "tls")]
pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder> {
pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder<C>> {
match self.inner.endpoint {
Endpoint::Tls(..) | Endpoint::Wss(..) => {
self.inner.tls_config = Some(config);
Expand All @@ -181,17 +185,17 @@ impl ServerBuilder {
}

/// Builds the server with the configured options.
pub async fn build(self) -> Result<Arc<Server>> {
pub async fn build(self) -> Result<Arc<Server<C>>> {
Server::init(self.inner, None).await
}

/// Builds the server with the configured options and an executor.
pub async fn build_with_executor(self, ex: Executor) -> Result<Arc<Server>> {
pub async fn build_with_executor(self, ex: Executor) -> Result<Arc<Server<C>>> {
Server::init(self.inner, Some(ex)).await
}
}

impl Server {
impl Server<JsonCodec> {
/// Creates a new [`ServerBuilder`]
///
/// This function initializes a `ServerBuilder` with the specified endpoint.
Expand All @@ -207,11 +211,78 @@ impl Server {
/// .expect("Build the server");
/// };
/// ```
pub fn builder(endpoint: impl ToEndpoint) -> Result<ServerBuilder> {
pub fn builder(endpoint: impl ToEndpoint) -> Result<ServerBuilder<JsonCodec>> {
Server::<JsonCodec>::builder_with_json_codec(endpoint, JsonCodec {})
}
}

impl<C> Server<C> {
/// Creates a new [`ServerBuilder`]
///
/// This function initializes a `ServerBuilder` with the specified endpoint
/// and the given json codec.
///
/// # Example
///
/// ```
/// use karyon_jsonrpc::Server;
/// use karyon_net::{codec::{Codec, Decoder, Encoder}, Error, Result};
///
/// use serde_json::Value;
///
/// #[derive(Clone)]
/// pub struct CustomJsonCodec {}
///
/// impl Codec for CustomJsonCodec {
/// type Item = serde_json::Value;
/// }
///
/// impl Encoder for CustomJsonCodec {
/// type EnItem = serde_json::Value;
/// fn encode(&self, src: &Self::EnItem, dst: &mut [u8]) -> Result<usize> {
/// let msg = match serde_json::to_string(src) {
/// Ok(m) => m,
/// Err(err) => return Err(Error::Encode(err.to_string())),
/// };
/// let buf = msg.as_bytes();
/// dst[..buf.len()].copy_from_slice(buf);
/// Ok(buf.len())
/// }
/// }
///
/// impl Decoder for CustomJsonCodec {
/// type DeItem = serde_json::Value;
/// fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeItem)>> {
/// let de = serde_json::Deserializer::from_slice(src);
/// let mut iter = de.into_iter::<serde_json::Value>();
///
/// let item = match iter.next() {
/// Some(Ok(item)) => item,
/// Some(Err(ref e)) if e.is_eof() => return Ok(None),
/// Some(Err(e)) => return Err(Error::Decode(e.to_string())),
/// None => return Ok(None),
/// };
///
/// Ok(Some((iter.byte_offset(), item)))
/// }
/// }
///
/// async {
/// let server = Server::builder_with_json_codec("tcp://127.0.0.1:3000", CustomJsonCodec {})
/// .expect("Create a new server builder with custom json codec")
/// .build().await
/// .expect("Build the server");
/// };
/// ```
pub fn builder_with_json_codec(
endpoint: impl ToEndpoint,
json_codec: C,
) -> Result<ServerBuilder<C>> {
let endpoint = endpoint.to_endpoint()?;
Ok(ServerBuilder {
inner: ServerConfig {
endpoint,
json_codec,
services: HashMap::new(),
pubsub_services: HashMap::new(),
#[cfg(feature = "tcp")]
Expand Down
Loading

0 comments on commit 9621f47

Please sign in to comment.