From fba460a53e1a6496dce4262a7d5778f55da3e250 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Sun, 17 Dec 2023 17:47:22 -0300 Subject: [PATCH 1/4] Log raw WS messages to facilitate the debugging --- src/conn.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/conn.rs b/src/conn.rs index 46d04b5d..ddf53ff5 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -136,22 +136,28 @@ impl Stream for Connection { } // read from the ws - match ready!(pin.ws.poll_next_unpin(cx)) { - Some(Ok(msg)) => match serde_json::from_slice::>(&msg.into_data()) { + let msg = match ready!(pin.ws.poll_next_unpin(cx)) { + Some(Ok(msg)) => msg, + Some(Err(err)) => return Poll::Ready(Some(Err(CdpError::Ws(err)))), + None => { + // ws connection closed + return Poll::Ready(None); + } + }; + + tracing::trace!(target: "chromiumoxide::conn::raw_ws", ?msg, "Got raw WS message"); + + Poll::Ready(Some( + match serde_json::from_slice::>(&msg.into_data()) { Ok(msg) => { tracing::trace!("Received {:?}", msg); - Poll::Ready(Some(Ok(msg))) + Ok(msg) } Err(err) => { tracing::error!("Failed to deserialize WS response {}", err); - Poll::Ready(Some(Err(err.into()))) + Err(err.into()) } }, - Some(Err(err)) => Poll::Ready(Some(Err(CdpError::Ws(err)))), - None => { - // ws connection closed - Poll::Ready(None) - } - } + )) } } From fa58a28dac1fab423017f5d3fd9f011ee2d0f5a9 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Sun, 17 Dec 2023 18:39:21 -0300 Subject: [PATCH 2/4] Add debug-raw-ws-messages feature --- Cargo.toml | 2 ++ src/conn.rs | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index ff72856b..8c544c0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,8 @@ _fetcher-rusttls-tokio = ["fetcher", "chromiumoxide_fetcher/_rustls-tokio"] _fetcher-native-async-std = ["fetcher", "chromiumoxide_fetcher/_native-async-std"] _fetcher-native-tokio = ["fetcher", "chromiumoxide_fetcher/_native-tokio"] +debug-raw-ws-messages = [] + [[example]] name = "wiki-tokio" required-features = ["tokio-runtime"] diff --git a/src/conn.rs b/src/conn.rs index ddf53ff5..5b8e40b4 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -147,6 +147,9 @@ impl Stream for Connection { tracing::trace!(target: "chromiumoxide::conn::raw_ws", ?msg, "Got raw WS message"); + #[cfg(feature = "debug-raw-ws-messages")] + let msg_for_debug = msg.clone(); + Poll::Ready(Some( match serde_json::from_slice::>(&msg.into_data()) { Ok(msg) => { @@ -154,6 +157,9 @@ impl Stream for Connection { Ok(msg) } Err(err) => { + #[cfg(feature = "debug-raw-ws-messages")] + tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = ?msg_for_debug, "Failed to parse raw WS message"); + tracing::error!("Failed to deserialize WS response {}", err); Err(err.into()) } From c2841e2510d8f6988fcd1410c517c897ff9cca13 Mon Sep 17 00:00:00 2001 From: Ryo Hirayama Date: Wed, 14 Feb 2024 17:10:58 +0900 Subject: [PATCH 3/4] Emit an error on WsMessage::Text fails to be parsed --- Cargo.toml | 2 -- src/conn.rs | 52 ++++++++++++++++++++++++++-------------------------- src/error.rs | 3 +++ 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8c544c0d..ff72856b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,8 +69,6 @@ _fetcher-rusttls-tokio = ["fetcher", "chromiumoxide_fetcher/_rustls-tokio"] _fetcher-native-async-std = ["fetcher", "chromiumoxide_fetcher/_native-async-std"] _fetcher-native-tokio = ["fetcher", "chromiumoxide_fetcher/_native-tokio"] -debug-raw-ws-messages = [] - [[example]] name = "wiki-tokio" required-features = ["tokio-runtime"] diff --git a/src/conn.rs b/src/conn.rs index 5b8e40b4..884e617b 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -3,6 +3,7 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::ready; +use async_tungstenite::tungstenite::Message as WsMessage; use async_tungstenite::{tungstenite::protocol::WebSocketConfig, WebSocketStream}; use futures::stream::Stream; use futures::task::{Context, Poll}; @@ -132,38 +133,37 @@ impl Stream for Connection { pin.pending_flush = Some(call); } } + break; } // read from the ws - let msg = match ready!(pin.ws.poll_next_unpin(cx)) { - Some(Ok(msg)) => msg, - Some(Err(err)) => return Poll::Ready(Some(Err(CdpError::Ws(err)))), + match ready!(pin.ws.poll_next_unpin(cx)) { + Some(Ok(WsMessage::Text(text))) => { + let ready = match serde_json::from_str::>(&text) { + Ok(msg) => { + tracing::trace!("Received {:?}", msg); + Ok(msg) + } + Err(err) => { + tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = text, "Failed to parse raw WS message"); + tracing::error!("Failed to deserialize WS response {}", err); + Err(err.into()) + } + }; + Poll::Ready(Some(ready)) + } + Some(Ok(WsMessage::Close(_))) => Poll::Ready(None), + Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => { + // ignore pings + Poll::Pending + } + Some(Ok(msg)) => Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))), + Some(Err(err)) => Poll::Ready(Some(Err(CdpError::Ws(err)))), None => { // ws connection closed - return Poll::Ready(None); + Poll::Ready(None) } - }; - - tracing::trace!(target: "chromiumoxide::conn::raw_ws", ?msg, "Got raw WS message"); - - #[cfg(feature = "debug-raw-ws-messages")] - let msg_for_debug = msg.clone(); - - Poll::Ready(Some( - match serde_json::from_slice::>(&msg.into_data()) { - Ok(msg) => { - tracing::trace!("Received {:?}", msg); - Ok(msg) - } - Err(err) => { - #[cfg(feature = "debug-raw-ws-messages")] - tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = ?msg_for_debug, "Failed to parse raw WS message"); - - tracing::error!("Failed to deserialize WS response {}", err); - Err(err.into()) - } - }, - )) + } } } diff --git a/src/error.rs b/src/error.rs index 99e45d4c..ce75bab4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,6 +4,7 @@ use std::process::ExitStatus; use std::time::Instant; use async_tungstenite::tungstenite; +use async_tungstenite::tungstenite::Message; use base64::DecodeError; use futures::channel::mpsc::SendError; use futures::channel::oneshot::Canceled; @@ -28,6 +29,8 @@ pub enum CdpError { Chrome(#[from] chromiumoxide_types::Error), #[error("Received no response from the chromium instance.")] NoResponse, + #[error("Received unexpected ws message: {0:?}")] + UnexpectedWsMessage(Message), #[error("{0}")] ChannelSendError(#[from] ChannelError), #[error("Browser process exited with status {0:?} before websocket URL could be resolved, stderr: {1:?}")] From cbc4db5f8c3ec592fe461974ef21f087a8733d2a Mon Sep 17 00:00:00 2001 From: Ryo Hirayama Date: Wed, 14 Feb 2024 22:09:22 +0900 Subject: [PATCH 4/4] Wake the task before returning Poll::Pending --- src/conn.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/conn.rs b/src/conn.rs index 884e617b..72c0d9ea 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -154,8 +154,9 @@ impl Stream for Connection { Poll::Ready(Some(ready)) } Some(Ok(WsMessage::Close(_))) => Poll::Ready(None), + // ignore ping and pong Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => { - // ignore pings + cx.waker().wake_by_ref(); Poll::Pending } Some(Ok(msg)) => Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))),