Skip to content

Commit

Permalink
fix: always try to flush after ready
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Oct 3, 2023
1 parent 0adb14b commit a53f059
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 35 deletions.
29 changes: 29 additions & 0 deletions examples/simple-google.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use chromiumoxide::browser::BrowserConfigBuilder;
use chromiumoxide::Browser;
use futures::StreamExt;
use std::time::Duration;
use tokio::task;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let (browser, mut handler) = Browser::launch(
BrowserConfigBuilder::default()
.request_timeout(Duration::from_secs(5))
.build()
.unwrap(),
)
.await
.unwrap();

let h = task::spawn(async move {
while let Some(h) = handler.next().await {
h.unwrap();
}
});

let page = browser.new_page("https://www.google.com").await.unwrap();

println!("loaded page {:?}", page);
h.await.unwrap();
}
69 changes: 35 additions & 34 deletions src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::ready;

use async_tungstenite::{tungstenite::protocol::WebSocketConfig, WebSocketStream};
use futures::stream::Stream;
use futures::task::{Context, Poll};
use futures::Sink;
use futures::{SinkExt, StreamExt};

use chromiumoxide_cdp::cdp::browser_protocol::target::SessionId;
use chromiumoxide_types::{CallId, EventMessage, Message, MethodCall, MethodId};
Expand Down Expand Up @@ -93,19 +94,15 @@ impl<T: EventMessage> Connection<T> {
/// sink
fn start_send_next(&mut self, cx: &mut Context<'_>) -> Result<()> {
if self.needs_flush {
if let Poll::Ready(Ok(())) = Sink::poll_flush(Pin::new(&mut self.ws), cx) {
if let Poll::Ready(Ok(())) = self.ws.poll_flush_unpin(cx) {
self.needs_flush = false;
}
}
if self.pending_flush.is_none() && !self.needs_flush {
if let Some(cmd) = self.pending_commands.pop_front() {
// if cmd.id.to_string().contains("1") {
// log::error!("CMD {:?}", cmd);
// return Ok(())
// }
tracing::trace!("Sending {:?}", cmd);
let msg = serde_json::to_string(&cmd)?;
Sink::start_send(Pin::new(&mut self.ws), msg.into())?;
self.ws.start_send_unpin(msg.into())?;
self.pending_flush = Some(cmd);
}
}
Expand All @@ -119,38 +116,42 @@ impl<T: EventMessage + Unpin> Stream for Connection<T> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let pin = self.get_mut();

// queue in the next message if not currently flushing
if let Err(err) = pin.start_send_next(cx) {
return Poll::Ready(Some(Err(err)));
}
loop {
// queue in the next message if not currently flushing
if let Err(err) = pin.start_send_next(cx) {
return Poll::Ready(Some(Err(err)));
}

// send the message
if let Some(call) = pin.pending_flush.take() {
if Sink::poll_ready(Pin::new(&mut pin.ws), cx).is_ready() {
pin.needs_flush = true;
} else {
pin.pending_flush = Some(call);
// send the message
if let Some(call) = pin.pending_flush.take() {
if pin.ws.poll_ready_unpin(cx).is_ready() {
pin.needs_flush = true;
// try another flush
continue;
} else {
pin.pending_flush = Some(call);
}
}
break;
}

// read from the ws
match Stream::poll_next(Pin::new(&mut pin.ws), cx) {
Poll::Ready(Some(Ok(msg))) => {
return match serde_json::from_slice::<Message<T>>(&msg.into_data()) {
Ok(msg) => {
tracing::trace!("Received {:?}", msg);
Poll::Ready(Some(Ok(msg)))
}
Err(err) => {
tracing::error!("Failed to deserialize WS response {}", err);
Poll::Ready(Some(Err(err.into())))
}
};
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Some(Err(CdpError::Ws(err))));
match ready!(pin.ws.poll_next_unpin(cx)) {
Some(Ok(msg)) => match serde_json::from_slice::<Message<T>>(&msg.into_data()) {
Ok(msg) => {
tracing::trace!("Received {:?}", msg);
Poll::Ready(Some(Ok(msg)))
}
Err(err) => {
tracing::error!("Failed to deserialize WS response {}", err);
Poll::Ready(Some(Err(err.into())))
}
},
Some(Err(err)) => Poll::Ready(Some(Err(CdpError::Ws(err)))),
None => {
// ws connection closed
Poll::Ready(None)
}
_ => {}
}
Poll::Pending
}
}
2 changes: 1 addition & 1 deletion src/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl EventListeners {
method,
kind,
} = req;
let subs = self.listeners.entry(method).or_insert_with(Vec::new);
let subs = self.listeners.entry(method).or_default();
subs.push(EventListener {
listener,
kind,
Expand Down

0 comments on commit a53f059

Please sign in to comment.