diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs index c852924bbf6..2ec8513baed 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs @@ -3,7 +3,7 @@ mod external_process; use super::*; use external_process::*; use serde::de::DeserializeOwned; -use std::sync::atomic::AtomicU64; +use std::{collections::HashMap, sync::atomic::AtomicU64}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; pub(crate) async fn executor_process_request( diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs index 912a5e6d8ab..06d1551f940 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs @@ -140,6 +140,42 @@ impl Display for ExecutorProcessDiedError { impl StdError for ExecutorProcessDiedError {} +struct PendingRequests { + map: HashMap>>, + last_id: Option, +} + +impl PendingRequests { + fn new() -> Self { + Self { + map: HashMap::new(), + last_id: None, + } + } + + fn insert(&mut self, id: jsonrpc_core::Id, sender: oneshot::Sender>) { + self.map.insert(id.clone(), sender); + self.last_id = Some(id); + } + + fn respond(&mut self, id: &jsonrpc_core::Id, response: Result) { + self.map + .remove(id) + .expect("no sender for response") + .send(response) + .unwrap(); + } + + fn respond_to_last(&mut self, response: Result) { + let last_id = self + .last_id + .as_ref() + .expect("Expected last response to exist") + .to_owned(); + self.respond(&last_id, response); + } +} + pub(super) static EXTERNAL_PROCESS: Lazy = Lazy::new(RestartableExecutorProcess::new); type ReqImpl = ( @@ -173,7 +209,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { let mut stdout = BufReader::new(process.stdout.unwrap()).lines(); let mut stdin = process.stdin.unwrap(); - let mut last_pending_request: Option<(jsonrpc_core::Id, oneshot::Sender>)> = None; + let mut pending_requests = PendingRequests::new(); loop { tokio::select! { @@ -186,24 +222,20 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { Ok(Some(line)) => // new response { match serde_json::from_str::(&line) { - Ok(response) => { - let (id, sender) = last_pending_request.take().expect("got a response from the external process, but there was no pending request"); - if &id != response.id() { - unreachable!("got a response from the external process, but the id didn't match. Are you running with cargo tests with `--test-threads=1`"); - } - - match response { + Ok(ref response) => { + let res: Result = match response { jsonrpc_core::Output::Success(success) => { // The other end may be dropped if the whole // request future was dropped and not polled to // completion, so we ignore send errors here. - _ = sender.send(Ok(success.result)); + Ok(success.result.clone()) } jsonrpc_core::Output::Failure(err) => { tracing::error!("error response from jsonrpc: {err:?}"); - _ = sender.send(Err(Box::new(err.error))); + Err(Box::new(err.error.clone())) } - } + }; + pending_requests.respond(response.id(), res) } Err(err) => { tracing::error!(%err, "error when decoding response from child node process. Response was: `{}`", &line); @@ -214,9 +246,8 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { Ok(None) => // end of the stream { tracing::error!("Error when reading from child node process. Process might have exited. Restarting..."); - if let Some((_, sender)) = last_pending_request.take() { - sender.send(Err(Box::new(ExecutorProcessDiedError))).unwrap(); - } + + pending_requests.respond_to_last(Err(Box::new(ExecutorProcessDiedError))); EXTERNAL_PROCESS.restart().await; break; } @@ -233,7 +264,8 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { exit_with_message(1, "The json-rpc client channel was closed"); } Some((request, response_sender)) => { - last_pending_request = Some((request.id.clone(), response_sender)); + pending_requests.insert(request.id.clone(), response_sender); + let mut req = serde_json::to_vec(&request).unwrap(); req.push(b'\n'); stdin.write_all(&req).await.unwrap(); diff --git a/query-engine/driver-adapters/src/lib.rs b/query-engine/driver-adapters/src/lib.rs index 6be1270b326..c175942dc04 100644 --- a/query-engine/driver-adapters/src/lib.rs +++ b/query-engine/driver-adapters/src/lib.rs @@ -54,7 +54,7 @@ mod arch { } pub(crate) fn has_named_property(object: &JsObject, name: &str) -> JsResult { - Ok(Reflect::has(&object, &JsString::from_str(name).unwrap().into())?) + Reflect::has(object, &JsString::from_str(name).unwrap().into()) } pub(crate) fn to_rust_str(value: JsString) -> JsResult {