Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(engineio/ws): add read_buffer_size option and set default to 4KiB #450

Merged
merged 5 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
heaptrack:
description: 'Run heaptrack memory benchmark'
description: "Run heaptrack memory benchmark"
required: true
default: false
type: boolean
Expand All @@ -18,6 +18,9 @@ jobs:
- uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
- uses: actions/setup-node@v4
with:
node-version: 22
- uses: actions/cache@v4
with:
path: |
Expand All @@ -30,17 +33,22 @@ jobs:
- name: Install heaptrack
run: sudo apt-get install -y heaptrack
- name: Build server && client
run: cargo build -r -p heaptrack && cargo build -r -p heaptrack --bin heaptrack-client
run: cargo build -r -p heaptrack && npm i --prefix e2e/heaptrack
- name: Run memory benchmark
run: heaptrack target/release/heaptrack > server.txt & ./target/release/heaptrack-client > client.txt
- name: Server output
if: always()
run: cat server.txt
- name: Client output
if: always()
run: cat client.txt
run: |
ulimit -n 20000
RUST_LOG="socketioxide=trace,engineioxide=trace,info" heaptrack target/release/heaptrack > server.txt &
cd e2e/heaptrack
node --experimental-strip-types client.ts > ../../client.txt &
sleep 60
kill $!
- name: Publish logs
uses: actions/upload-artifact@v4
with:
name: logs-${{ github.head_ref }}.${{ github.sha }}
path: *.txt
- name: Publish memory benchmark
uses: actions/upload-artifact@v4
with:
name: heaptrack-${{ github.head_ref }}.${{ github.sha }}
path: heaptrack.heaptrack.*
path: heaptrack.heaptrack.*
20 changes: 19 additions & 1 deletion crates/engineioxide/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,16 @@ pub struct EngineIoConfig {
pub max_buffer_size: usize,

/// The maximum number of bytes that can be received per http request.
/// Defaults to 100kb.
/// Defaults to 100KB.
pub max_payload: u64,

/// The size of the read buffer for the websocket transport.
/// You can tweak this value depending on your use case. By default it is set to 4KiB.
///
/// Setting it to a higher value will improve performance on heavy read scenarios
/// but will consume more memory.
pub ws_read_buffer_size: usize,

/// Allowed transports on this server
/// It is represented as a bitfield to allow to combine any number of transports easily
pub transports: u8,
Expand All @@ -73,6 +80,7 @@ impl Default for EngineIoConfig {
ping_timeout: Duration::from_millis(20000),
max_buffer_size: 128,
max_payload: 1e5 as u64, // 100kb
ws_read_buffer_size: 4096,
transports: TransportType::Polling as u8 | TransportType::Websocket as u8,
}
}
Expand Down Expand Up @@ -173,6 +181,16 @@ impl EngineIoConfigBuilder {
self
}

/// The size of the read buffer for the websocket transport.
/// You can tweak this value depending on your use case. Defaults to 4KiB.
///
/// Setting it to a higher value will improve performance on heavy read scenarios
/// but will consume more memory.
pub fn ws_read_buffer_size(mut self, ws_read_buffer_size: usize) -> Self {
self.config.ws_read_buffer_size = ws_read_buffer_size;
self
}

/// Allowed transports on this server
///
/// The `transports` array should have a size of 1 or 2
Expand Down
9 changes: 7 additions & 2 deletions crates/engineioxide/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ use tokio::{
task::JoinHandle,
};
use tokio_tungstenite::{
tungstenite::{handshake::derive_accept_key, protocol::Role, Message},
tungstenite::{
handshake::derive_accept_key,
protocol::{Role, WebSocketConfig},
Message,
},
WebSocketStream,
};

Expand Down Expand Up @@ -110,7 +114,8 @@ pub async fn on_init<H: EngineIoHandler, S>(
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let ws_init = move || WebSocketStream::from_raw_socket(conn, Role::Server, None);
let ws_config = WebSocketConfig::default().read_buffer_size(engine.config.ws_read_buffer_size);
let ws_init = move || WebSocketStream::from_raw_socket(conn, Role::Server, Some(ws_config));
let (socket, ws) = if let Some(sid) = sid {
match engine.get_socket(sid) {
None => return Err(Error::UnknownSessionID(sid)),
Expand Down
4 changes: 3 additions & 1 deletion crates/parser-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ impl Parse for CommonParser {
..
}) => {
let binaries = binaries.get_or_insert(VecDeque::new());
binaries.push_back(data);
// We copy the data to avoid holding a ref to the engine.io
// websocket buffer too long.
binaries.push_back(Bytes::copy_from_slice(&data));
if state.incoming_binary_cnt.load(Ordering::Relaxed) > binaries.len() {
Err(ParseError::NeedsMoreBinaryData)
} else {
Expand Down
19 changes: 9 additions & 10 deletions crates/socketioxide/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<A: Adapter> Client<A> {
fn sock_connect(
self: &Arc<Self>,
auth: Option<Value>,
ns_path: Str,
ns_path: &str,
esocket: &Arc<engineioxide::Socket<SocketData<A>>>,
) {
#[cfg(feature = "tracing")]
Expand All @@ -77,12 +77,10 @@ impl<A: Adapter> Client<A> {
}
};

if let Some(ns) = self.get_ns(&ns_path) {
if let Some(ns) = self.get_ns(ns_path) {
tokio::spawn(connect(ns, esocket.clone()));
} else if let Ok(Match { value: ns_ctr, .. }) = self.router.read().unwrap().at(&ns_path) {
// We have to create a new `Str` otherwise, we would keep a ref to the original connect packet
// for the entire lifetime of the Namespace.
let path = Str::copy_from_slice(&ns_path);
} else if let Ok(Match { value: ns_ctr, .. }) = self.router.read().unwrap().at(ns_path) {
let path = Str::copy_from_slice(ns_path);
let ns = ns_ctr.get_new_ns(path.clone(), &self.adapter_state, &self.config);
let this = self.clone();
let esocket = esocket.clone();
Expand All @@ -100,9 +98,10 @@ impl<A: Adapter> Client<A> {
);
esocket.close(EIoDisconnectReason::TransportClose);
} else {
let path = Str::copy_from_slice(ns_path);
let packet = self
.parser()
.encode(Packet::connect_error(ns_path, "Invalid namespace"));
.encode(Packet::connect_error(path, "Invalid namespace"));
let _ = match packet {
Value::Str(p, _) => esocket.emit(p).map_err(|_e| {
#[cfg(feature = "tracing")]
Expand Down Expand Up @@ -257,7 +256,7 @@ impl<A: Adapter> EngineIoHandler for Client<A> {
ProtocolVersion::V4 => {
#[cfg(feature = "tracing")]
tracing::debug!("connecting to default namespace for v4");
self.sock_connect(None, Str::from("/"), &socket);
self.sock_connect(None, "/", &socket);
}
ProtocolVersion::V5 => self.spawn_connect_timeout_task(socket),
}
Expand Down Expand Up @@ -302,7 +301,7 @@ impl<A: Adapter> EngineIoHandler for Client<A> {

let res: Result<(), Error> = match packet.inner {
PacketData::Connect(auth) => {
self.sock_connect(auth, packet.ns, &socket);
self.sock_connect(auth, &packet.ns, &socket);
Ok(())
}
_ => self.sock_propagate_packet(packet, socket.id),
Expand Down Expand Up @@ -339,7 +338,7 @@ impl<A: Adapter> EngineIoHandler for Client<A> {

let res: Result<(), Error> = match packet.inner {
PacketData::Connect(auth) => {
self.sock_connect(auth, packet.ns, &socket);
self.sock_connect(auth, &packet.ns, &socket);
Ok(())
}
_ => self.sock_propagate_packet(packet, socket.id),
Expand Down
13 changes: 13 additions & 0 deletions crates/socketioxide/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@ impl<A: Adapter> SocketIoBuilder<A> {
self
}

/// The size of the read buffer for the websocket transport.
/// You can tweak this value depending on your use case. Defaults to 4KiB.
///
/// Setting it to a higher value will improve performance on heavy read scenarios
/// but will consume more memory.
#[inline]
pub fn ws_read_buffer_size(mut self, ws_read_buffer_size: usize) -> Self {
self.engine_config_builder = self
.engine_config_builder
.ws_read_buffer_size(ws_read_buffer_size);
self
}

/// Allowed transports on this server
///
/// The `transports` array should have a size of 1 or 2
Expand Down
4 changes: 2 additions & 2 deletions e2e/heaptrack/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
*.gz
client/node_modules
memory_usage.svg
node_modules
memory_usage.svg
10 changes: 3 additions & 7 deletions e2e/heaptrack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
socketioxide = { path = "../../crates/socketioxide" }
socketioxide = { path = "../../crates/socketioxide", features = ["tracing"] }
hyper = { workspace = true, features = ["server", "http1"] }
hyper-util = { workspace = true, features = ["tokio"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
rust_socketio = { version = "0.6.0", features = ["async"] }
serde_json = "1.0.68"
tracing-subscriber.workspace = true
serde_json.workspace = true
rand = "0.8.4"

[[bin]]
name = "heaptrack-client"
path = "src/client.rs"
56 changes: 56 additions & 0 deletions e2e/heaptrack/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { io } from "socket.io-client";

const URL = process.env.URL || "http://localhost:3000";
const MAX_CLIENTS = 5000;
const POLLING_PERCENTAGE = 0.05;
const CLIENT_CREATION_INTERVAL_IN_MS = 1;
const EMIT_INTERVAL_IN_MS = 1000;

let clientCount = 0;
let lastReport = new Date().getTime();
let packetsSinceLastReport = 0;

const createClient = () => {
// for demonstration purposes, some clients stay stuck in HTTP long-polling
const transports =
Math.random() < POLLING_PERCENTAGE ? ["polling"] : ["polling", "websocket"];

const socket = io(URL, {
transports,
});

setInterval(() => {
socket.emit("ping");
}, EMIT_INTERVAL_IN_MS);

socket.on("pong", () => {
packetsSinceLastReport++;
});

socket.on("disconnect", (reason) => {
console.log(`disconnect due to ${reason}`);
});

if (++clientCount < MAX_CLIENTS) {
setTimeout(createClient, CLIENT_CREATION_INTERVAL_IN_MS);
}
};

createClient();

const printReport = () => {
const now = new Date().getTime();
const durationSinceLastReport = (now - lastReport) / 1000;
const packetsPerSeconds = (
packetsSinceLastReport / durationSinceLastReport
).toFixed(2);

console.log(
`client count: ${clientCount} ; average packets received per second: ${packetsPerSeconds}`,
);

packetsSinceLastReport = 0;
lastReport = now;
};

setInterval(printReport, 5000);
Loading
Loading