Skip to content

Commit

Permalink
Implement next API for ws_stream_wasm
Browse files Browse the repository at this point in the history
  • Loading branch information
obmarg committed Feb 3, 2024
1 parent 3e19306 commit b29d7b6
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 8 deletions.
1 change: 1 addition & 0 deletions examples-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ log = "0.4"
# wasm-specific
ws_stream_wasm = "0.7"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
console_log = "1"

[dependencies.graphql-ws-client]
Expand Down
15 changes: 8 additions & 7 deletions examples-wasm/examples/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
//!
//! Talks to the the tide subscription example in `async-graphql`

use std::future::IntoFuture;

use graphql_ws_client::{next::ClientBuilder, ws_stream_wasm::Connection};

mod schema {
cynic::use_schema!("../schemas/books.graphql");
}
Expand Down Expand Up @@ -37,13 +41,12 @@ struct BooksChangedSubscription {
#[async_std::main]
async fn main() {
use futures::StreamExt;
use graphql_ws_client::CynicClientBuilder;
use log::info;
use wasm_bindgen::UnwrapThrowExt;

console_log::init_with_level(log::Level::Info).expect("init logging");

let (ws, wsio) = ws_stream_wasm::WsMeta::connect(
let ws_conn = ws_stream_wasm::WsMeta::connect(
"ws://localhost:8000/graphql",
Some(vec!["graphql-transport-ws"]),
)
Expand All @@ -52,12 +55,10 @@ async fn main() {

info!("Connected");

let (sink, stream) = graphql_ws_client::wasm_websocket_combined_split(ws, wsio).await;
let connection = Connection::new(ws_conn).await;

let mut client = CynicClientBuilder::new()
.build(stream, sink, async_executors::AsyncStd)
.await
.unwrap();
let (mut client, actor) = ClientBuilder::new().build(connection).await.unwrap();
wasm_bindgen_futures::spawn_local(actor.into_future());

let mut stream = client.streaming_operation(build_query()).await.unwrap();
info!("Running subscription");
Expand Down
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ pub mod next;
#[cfg(feature = "ws_stream_wasm")]
mod wasm;
#[cfg(feature = "ws_stream_wasm")]
pub use wasm::*;
pub use wasm::{wasm_websocket_combined_split, FusedWasmWebsocketSink, WasmWebsocketMessage};

#[cfg(feature = "ws_stream_wasm")]
/// Integration with the ws_stream_wasm library
pub mod ws_stream_wasm;

#[cfg(feature = "async-tungstenite")]
mod native;
Expand Down
89 changes: 89 additions & 0 deletions src/ws_stream_wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use futures::{FutureExt, SinkExt, StreamExt};
use pharos::{Observable, ObserveConfig};
use ws_stream_wasm::{WsEvent, WsMessage, WsMeta, WsStream};

use crate::Error;

/// A websocket connection for ws_stream_wasm
pub struct Connection {
messages: WsStream,
event_stream: pharos::Events<WsEvent>,
meta: WsMeta,
}

impl Connection {
/// Creates a new Connection from a WsMeta & WsTream combo
pub async fn new((mut meta, messages): (WsMeta, WsStream)) -> Self {
let event_stream = meta.observe(ObserveConfig::default()).await.unwrap();

Connection {
messages,
event_stream,
meta,
}
}
}

#[async_trait::async_trait]
impl crate::next::Connection for Connection {
async fn receive(&mut self) -> Option<crate::next::Message> {
use crate::next::Message;
loop {
match self.next().await? {
EventOrMessage::Event(WsEvent::Closed(close)) => {
return Some(Message::Close {
code: Some(close.code),
reason: Some(close.reason),
});
}
EventOrMessage::Event(WsEvent::Error | WsEvent::WsErr(_)) => {
return None;
}
EventOrMessage::Event(WsEvent::Open | WsEvent::Closing) => {
continue;
}

EventOrMessage::Message(WsMessage::Text(text)) => return Some(Message::Text(text)),

EventOrMessage::Message(WsMessage::Binary(_)) => {
// We shouldn't receive binary messages, but ignore them if we do
continue;
}
}
}
}

async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> {
use crate::next::Message;

match message {
Message::Text(text) => self.messages.send(WsMessage::Text(text)).await,
Message::Close { code, reason } => match (code, reason) {
(Some(code), Some(reason)) => self.meta.close_reason(code, reason).await,
(Some(code), _) => self.meta.close_code(code).await,
_ => self.meta.close().await,
}
.map(|_| ()),
Message::Ping | Message::Pong => return Ok(()),
}
.map_err(|error| Error::Send(error.to_string()))
}
}

impl Connection {
async fn next(&mut self) -> Option<EventOrMessage> {
futures::select! {
event = self.event_stream.next().fuse() => {
event.map(EventOrMessage::Event)
}
message = self.messages.next().fuse() => {
message.map(EventOrMessage::Message)
}
}
}
}

enum EventOrMessage {
Event(WsEvent),
Message(WsMessage),
}

0 comments on commit b29d7b6

Please sign in to comment.