Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom parser support #231

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .idea/shelf/operators_single_rs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

341 changes: 341 additions & 0 deletions .idea/shelf/operators_single_rs/shelved.patch

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

415 changes: 415 additions & 0 deletions .idea/workspace.xml

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions examples/axum-echo/axum_echo.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use axum::routing::get;
use serde_json::Value;
use socketioxide::{
extract::{AckSender, Bin, Data, SocketRef},
SocketIo,
};
use socketioxide::{extract::{AckSender, Bin, Data, SocketRef}, SocketIo, SocketIoBuilder};

Check warning

Code scanning / clippy

unused import: SocketIo Warning

unused import: SocketIo
use tracing::info;
use tracing_subscriber::FmtSubscriber;
use socketioxide::parser::MsgpackParser;

fn on_connect(socket: SocketRef, Data(data): Data<Value>) {
info!("Socket.IO connected: {:?} {:?}", socket.ns(), socket.id);
Expand All @@ -32,7 +30,9 @@
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(FmtSubscriber::default())?;

let (layer, io) = SocketIo::new_layer();
let (layer, io) = SocketIoBuilder::new()
.with_parser(MsgpackParser::default())

Check warning

Code scanning / clippy

use of default to create a unit struct Warning

use of default to create a unit struct
.build_layer();

io.ns("/", on_connect);
io.ns("/custom", on_connect);
Expand Down
13 changes: 13 additions & 0 deletions examples/axum-echo/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import io from "socket.io-client";
import parser from "./parser.js";

const enc = new TextEncoder();
const socket = io(`ws://127.0.0.1:3000/`, {
// parser: parser
});

socket.on("connect", () => {
socket.emit("message-with-ack", { data: "nice" }, enc.encode("Some nice broadcasting yk =)"), (...response) => {
console.log("\x1b[33msqw:broadcast response:", JSON.stringify(response), "\x1b[0m");
});
})
1 change: 1 addition & 0 deletions socketioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ thiserror.workspace = true
itoa.workspace = true
hyper.workspace = true
pin-project-lite.workspace = true
dyn-clone = "1.0.16"

# Extensions
dashmap = { version = "5.4.0", optional = true }
Expand Down
37 changes: 7 additions & 30 deletions socketioxide/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl<A: Adapter> EngineIoHandler for Client<A> {
fn on_message(&self, msg: String, socket: Arc<EIoSocket<SocketData>>) {
#[cfg(feature = "tracing")]
tracing::debug!("Received message: {:?}", msg);
let packet = match Packet::try_from(msg) {
let packet = match self.config.parser.decode_msg(msg, socket.clone()) {
Ok(packet) => packet,
Err(_e) => {
#[cfg(feature = "tracing")]
Expand Down Expand Up @@ -242,42 +242,19 @@ impl<A: Adapter> EngineIoHandler for Client<A> {
///
/// If the packet is complete, it is propagated to the namespace
fn on_binary(&self, data: Vec<u8>, socket: Arc<EIoSocket<SocketData>>) {
if apply_payload_on_packet(data, &socket) {
if let Some(packet) = socket.data.partial_bin_packet.lock().unwrap().take() {
if let Err(ref err) = self.sock_propagate_packet(packet, socket.id) {
#[cfg(feature = "tracing")]
tracing::debug!(
if let Some(packet) = self.config.parser.decode_bin(data, socket.clone()) {
if let Err(ref err) = self.sock_propagate_packet(packet, socket.id) {
#[cfg(feature = "tracing")]
tracing::debug!(
"error while propagating packet to socket {}: {}",
socket.id,
err
);
if let Some(reason) = err.into() {
socket.close(reason);
}
if let Some(reason) = err.into() {
socket.close(reason);
}
}
}
}
}

/// Utility that applies an incoming binary payload to a partial binary packet
/// waiting to be filled with all the payloads
///
/// Returns true if the packet is complete and should be processed
fn apply_payload_on_packet(data: Vec<u8>, socket: &EIoSocket<SocketData>) -> bool {
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] applying payload on packet", socket.id);
if let Some(ref mut packet) = *socket.data.partial_bin_packet.lock().unwrap() {
match packet.inner {
PacketData::BinaryEvent(_, ref mut bin, _) | PacketData::BinaryAck(ref mut bin, _) => {
bin.add_payload(data);
bin.is_complete()
}
_ => unreachable!("partial_bin_packet should only be set for binary packets"),
}
} else {
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] socket received unexpected bin data", socket.id);
false
}
}
12 changes: 12 additions & 0 deletions socketioxide/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
service::SocketIoService,
BroadcastError, DisconnectError,
};
use crate::parser::{DefaultParser, Parser};

/// Configuration for Socket.IO & Engine.IO
#[derive(Debug, Clone)]
Expand All @@ -34,6 +35,9 @@
///
/// Defaults to 45 seconds.
pub connect_timeout: Duration,

/// A custom parser that encodes and decodes packets
pub parser: Box<dyn Parser>
}

impl Default for SocketIoConfig {
Expand All @@ -45,6 +49,7 @@
},
ack_timeout: Duration::from_secs(5),
connect_timeout: Duration::from_secs(45),
parser: Box::new(DefaultParser::default())

Check warning

Code scanning / clippy

Box::new(_) of default value Warning

Box::new(\_) of default value

Check warning

Code scanning / clippy

use of default to create a unit struct Warning

use of default to create a unit struct
}
}
}
Expand Down Expand Up @@ -152,6 +157,13 @@
self
}

/// Sets a custom [`Parser`] for encoding and decoding packets for this [`SocketIoBuilder`].
/// Can be used to implement a custom protocol.
pub fn with_parser<P : Parser + Send + Sync + 'static>(mut self, parser: P) -> Self {
self.config.parser = Box::new(parser);
self
}

/// Sets a custom [`Adapter`] for this [`SocketIoBuilder`]
pub fn with_adapter<B: Adapter>(self) -> SocketIoBuilder<B> {
SocketIoBuilder {
Expand Down
1 change: 1 addition & 0 deletions socketioxide/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@
pub mod packet;
pub mod service;
pub mod socket;
pub mod parser;

Check warning

Code scanning / clippy

missing documentation for a module Warning

missing documentation for a module

pub use engineioxide::TransportType;
pub use errors::{AckError, AdapterError, BroadcastError, DisconnectError, SendError, SocketError};
Expand Down
2 changes: 1 addition & 1 deletion socketioxide/src/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ impl<A: Adapter> Operators<A> {
}
}

#[cfg(feature = "test-utils")]
// #[cfg(feature = "test-utils")]
impl<A: Adapter> Operators<A> {
#[allow(dead_code)]
pub(crate) fn is_broadcast(&self) -> bool {
Expand Down
Loading
Loading