Skip to content

Commit

Permalink
Test fixes for NAPI tests (#4515)
Browse files Browse the repository at this point in the history
  • Loading branch information
Miguel Fernández authored Dec 5, 2023
1 parent 2a2565a commit 9a99fd2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod external_process;
use super::*;
use external_process::*;
use serde::de::DeserializeOwned;
use std::sync::atomic::AtomicU64;
use std::{collections::HashMap, sync::atomic::AtomicU64};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

pub(crate) async fn executor_process_request<T: DeserializeOwned>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,42 @@ impl Display for ExecutorProcessDiedError {

impl StdError for ExecutorProcessDiedError {}

struct PendingRequests {
map: HashMap<jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>>,
last_id: Option<jsonrpc_core::Id>,
}

impl PendingRequests {
fn new() -> Self {
Self {
map: HashMap::new(),
last_id: None,
}
}

fn insert(&mut self, id: jsonrpc_core::Id, sender: oneshot::Sender<Result<serde_json::value::Value>>) {
self.map.insert(id.clone(), sender);
self.last_id = Some(id);
}

fn respond(&mut self, id: &jsonrpc_core::Id, response: Result<serde_json::value::Value>) {
self.map
.remove(id)
.expect("no sender for response")
.send(response)
.unwrap();
}

fn respond_to_last(&mut self, response: Result<serde_json::value::Value>) {
let last_id = self
.last_id
.as_ref()
.expect("Expected last response to exist")
.to_owned();
self.respond(&last_id, response);
}
}

pub(super) static EXTERNAL_PROCESS: Lazy<RestartableExecutorProcess> = Lazy::new(RestartableExecutorProcess::new);

type ReqImpl = (
Expand Down Expand Up @@ -173,7 +209,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {

let mut stdout = BufReader::new(process.stdout.unwrap()).lines();
let mut stdin = process.stdin.unwrap();
let mut last_pending_request: Option<(jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>)> = None;
let mut pending_requests = PendingRequests::new();

loop {
tokio::select! {
Expand All @@ -186,24 +222,20 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
Ok(Some(line)) => // new response
{
match serde_json::from_str::<jsonrpc_core::Output>(&line) {
Ok(response) => {
let (id, sender) = last_pending_request.take().expect("got a response from the external process, but there was no pending request");
if &id != response.id() {
unreachable!("got a response from the external process, but the id didn't match. Are you running with cargo tests with `--test-threads=1`");
}

match response {
Ok(ref response) => {
let res: Result<serde_json::value::Value> = match response {
jsonrpc_core::Output::Success(success) => {
// The other end may be dropped if the whole
// request future was dropped and not polled to
// completion, so we ignore send errors here.
_ = sender.send(Ok(success.result));
Ok(success.result.clone())
}
jsonrpc_core::Output::Failure(err) => {
tracing::error!("error response from jsonrpc: {err:?}");
_ = sender.send(Err(Box::new(err.error)));
Err(Box::new(err.error.clone()))
}
}
};
pending_requests.respond(response.id(), res)
}
Err(err) => {
tracing::error!(%err, "error when decoding response from child node process. Response was: `{}`", &line);
Expand All @@ -214,9 +246,8 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
Ok(None) => // end of the stream
{
tracing::error!("Error when reading from child node process. Process might have exited. Restarting...");
if let Some((_, sender)) = last_pending_request.take() {
sender.send(Err(Box::new(ExecutorProcessDiedError))).unwrap();
}

pending_requests.respond_to_last(Err(Box::new(ExecutorProcessDiedError)));
EXTERNAL_PROCESS.restart().await;
break;
}
Expand All @@ -233,7 +264,8 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
exit_with_message(1, "The json-rpc client channel was closed");
}
Some((request, response_sender)) => {
last_pending_request = Some((request.id.clone(), response_sender));
pending_requests.insert(request.id.clone(), response_sender);

let mut req = serde_json::to_vec(&request).unwrap();
req.push(b'\n');
stdin.write_all(&req).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion query-engine/driver-adapters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod arch {
}

pub(crate) fn has_named_property(object: &JsObject, name: &str) -> JsResult<bool> {
Ok(Reflect::has(&object, &JsString::from_str(name).unwrap().into())?)
Reflect::has(object, &JsString::from_str(name).unwrap().into())
}

pub(crate) fn to_rust_str(value: JsString) -> JsResult<String> {
Expand Down

0 comments on commit 9a99fd2

Please sign in to comment.