From a4ed5a220840a92f250f573e82e8258a42afa0ed Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 26 Aug 2021 10:59:51 +0100 Subject: [PATCH] Refactor traverser to remove interior mutability (#114) Fixes #96 Closes #121 --- Cargo.lock | 44 + crates/apollo-json-ext/Cargo.toml | 11 + crates/apollo-json-ext/src/lib.rs | 453 +++++++++ crates/execution/Cargo.toml | 28 +- crates/execution/src/error.rs | 137 +++ crates/execution/src/federated.rs | 916 ++++++------------ crates/execution/src/http_service_registry.rs | 18 +- crates/execution/src/http_subgraph.rs | 42 +- crates/execution/src/json_utils.rs | 173 ---- crates/execution/src/lib.rs | 448 ++------- crates/execution/src/response.rs | 248 +++++ crates/execution/src/traverser.rs | 537 ---------- crates/query-planner/Cargo.toml | 8 +- crates/query-planner/src/caching.rs | 26 +- crates/query-planner/src/harmonizer.rs | 7 +- crates/query-planner/src/lib.rs | 16 +- crates/query-planner/src/model.rs | 31 +- crates/server/src/graph_factory.rs | 2 +- .../server/src/hyper_http_server_factory.rs | 33 +- crates/server/src/lib.rs | 2 +- 20 files changed, 1361 insertions(+), 1819 deletions(-) create mode 100644 crates/apollo-json-ext/Cargo.toml create mode 100644 crates/apollo-json-ext/src/lib.rs create mode 100644 crates/execution/src/error.rs delete mode 100644 crates/execution/src/json_utils.rs create mode 100644 crates/execution/src/response.rs delete mode 100644 crates/execution/src/traverser.rs diff --git a/Cargo.lock b/Cargo.lock index 3e287cc4c..965e837fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,6 +41,15 @@ version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15af2628f6890fe2609a3b91bef4c83450512802e59489f9c1cb1fa5df064a61" +[[package]] +name = "apollo-json-ext" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "ascii-canvas" version = "3.0.0" @@ -179,6 +188,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "async-recursion" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-std" version = "1.9.0" @@ -620,6 +640,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "displaydoc" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf95dc3f046b9da4f2d51833c0d3547d8564ef6910f5c1ed130306a75b92886" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -691,11 +722,14 @@ checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" name = "execution" version = "0.1.0-prealpha.1" dependencies = [ + "apollo-json-ext", + "async-recursion", "bytes", "configuration", "ctor", "derivative", "derive_more", + "displaydoc", "env_logger", "futures", "httpmock", @@ -708,6 +742,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "static_assertions", "thiserror", "tokio", "typed-builder", @@ -1926,12 +1961,15 @@ dependencies = [ name = "query-planner" version = "0.1.0-prealpha.1" dependencies = [ + "apollo-json-ext", "derive_more", "harmonizer", "insta", "mockall", + "parking_lot", "serde", "serde_json", + "static_assertions", "thiserror", ] @@ -2363,6 +2401,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "string_cache" version = "0.8.1" diff --git a/crates/apollo-json-ext/Cargo.toml b/crates/apollo-json-ext/Cargo.toml new file mode 100644 index 000000000..62f8c016a --- /dev/null +++ b/crates/apollo-json-ext/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "apollo-json-ext" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1", features = ["derive", "rc"] } +serde_json = "1" +thiserror = "1" diff --git a/crates/apollo-json-ext/src/lib.rs b/crates/apollo-json-ext/src/lib.rs new file mode 100644 index 000000000..8476e9711 --- /dev/null +++ b/crates/apollo-json-ext/src/lib.rs @@ -0,0 +1,453 @@ +use serde::{Deserialize, Serialize}; +use serde_json::map::Entry; +use serde_json::{Map, Value}; +use std::fmt; +use thiserror::Error; + +pub mod prelude { + pub use super::{Object, Path, ValueExt}; + pub use serde_json::Value; +} + +/// A JSON object. +pub type Object = Map; + +#[derive(Debug, Error)] +pub enum Error { + #[error("could not find path in JSON")] + PathNotFound, + #[error("attempt to flatten on non-array node")] + InvalidFlatten, +} + +/// Extension trait for [`serde_json::Value`]. +pub trait ValueExt { + /// Get a reference to the value(s) at a particular path. + #[track_caller] + fn get_at_path<'a, 'b>(&'a self, path: &'b Path) -> Result, Error>; + + /// Get a mutable reference to the value(s) at a particular path. + #[track_caller] + fn get_at_path_mut<'a, 'b>(&'a mut self, path: &'b Path) -> Result, Error>; + + /// Deep merge the JSON objects, array and override the values in `&mut self` if they already + /// exists. + #[track_caller] + fn deep_merge(&mut self, other: &Self); + + /// Returns `true` if the set is a subset of another, i.e., `other` contains at least all the + /// values in `self`. + #[track_caller] + fn is_subset(&self, superset: &Value) -> bool; +} + +impl ValueExt for Value { + fn get_at_path<'a, 'b>(&'a self, path: &'b Path) -> Result, Error> { + let mut current = vec![self]; + + for path_element in path.iter() { + current = match path_element { + PathElement::Flatten => current + .into_iter() + .map(|x| x.as_array().ok_or(Error::InvalidFlatten)) + .collect::, _>>()? + .into_iter() + .flatten() + .collect::>(), + PathElement::Index(index) if !(current.len() == 1 && current[0].is_array()) => { + vec![current.into_iter().nth(*index).ok_or(Error::PathNotFound)?] + } + path_element => current + .into_iter() + .map(|value| match (path_element, value) { + (PathElement::Key(key), value) if value.is_object() => value + .as_object() + .unwrap() + .get(key) + .ok_or(Error::PathNotFound), + (PathElement::Key(_), _) => Err(Error::PathNotFound), + (PathElement::Index(i), value) => value + .as_array() + .ok_or(Error::PathNotFound)? + .get(*i) + .ok_or(Error::PathNotFound), + (PathElement::Flatten, _) => unreachable!(), + }) + .collect::, _>>()?, + } + } + + Ok(current) + } + + fn get_at_path_mut<'a, 'b>(&'a mut self, path: &'b Path) -> Result, Error> { + let mut current = vec![self]; + + for path_element in path.iter() { + current = match path_element { + PathElement::Flatten => current + .into_iter() + .map(|x| x.as_array_mut().ok_or(Error::InvalidFlatten)) + .collect::, _>>()? + .into_iter() + .flatten() + .collect::>(), + PathElement::Index(index) if !(current.len() == 1 && current[0].is_array()) => { + vec![current.into_iter().nth(*index).ok_or(Error::PathNotFound)?] + } + path_element => current + .into_iter() + .map(|value| match (path_element, value) { + (PathElement::Key(key), value) if value.is_object() => value + .as_object_mut() + .unwrap() + .get_mut(key) + .ok_or(Error::PathNotFound), + (PathElement::Key(_), _) => Err(Error::PathNotFound), + (PathElement::Index(i), value) => value + .as_array_mut() + .ok_or(Error::PathNotFound)? + .get_mut(*i) + .ok_or(Error::PathNotFound), + (PathElement::Flatten, _) => unreachable!(), + }) + .collect::, _>>()?, + } + } + + Ok(current) + } + + fn deep_merge(&mut self, other: &Self) { + match (self, other) { + (Value::Object(a), Value::Object(b)) => { + for (key, value) in b.iter() { + match a.entry(key) { + Entry::Vacant(e) => { + e.insert(value.to_owned()); + } + Entry::Occupied(e) => { + e.into_mut().deep_merge(value); + } + } + } + } + (Value::Array(a), Value::Array(b)) => { + for (index, value) in a.iter_mut().enumerate() { + if let Some(b) = b.get(index) { + value.deep_merge(b); + } + } + } + (a, b) => { + *a = b.to_owned(); + } + } + } + + fn is_subset(&self, superset: &Value) -> bool { + match (self, superset) { + (Value::Object(subset), Value::Object(superset)) => { + subset.iter().all(|(key, value)| { + if let Some(other) = superset.get(key) { + value.is_subset(other) + } else { + false + } + }) + } + (Value::Array(subset), Value::Array(superset)) => { + subset.len() == superset.len() + && subset.iter().enumerate().all(|(index, value)| { + if let Some(other) = superset.get(index) { + value.is_subset(other) + } else { + false + } + }) + } + (a, b) => a == b, + } + } +} + +/// A GraphQL path element that is composes of strings or numbers. +/// e.g `/book/3/name` +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum PathElement { + /// A path element that given an array will flatmap the content. + #[serde( + deserialize_with = "deserialize_flatten", + serialize_with = "serialize_flatten" + )] + Flatten, + + /// An index path element. + Index(usize), + + /// A key path element. + Key(String), +} + +fn deserialize_flatten<'de, D>(deserializer: D) -> Result<(), D::Error> +where + D: serde::Deserializer<'de>, +{ + deserializer.deserialize_str(FlattenVisitor) +} + +struct FlattenVisitor; + +impl<'de> serde::de::Visitor<'de> for FlattenVisitor { + type Value = (); + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "a string that is '@'") + } + + fn visit_str(self, s: &str) -> Result + where + E: serde::de::Error, + { + if s == "@" { + Ok(()) + } else { + Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Str(s), + &self, + )) + } + } +} + +fn serialize_flatten(serializer: S) -> Result +where + S: serde::Serializer, +{ + serializer.serialize_str("@") +} + +/// A path into the result document. This can be composed of strings and numbers +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Default)] +#[serde(transparent)] +pub struct Path(Vec); + +impl Path { + pub fn from_slice>(s: &[T]) -> Self { + Self( + s.iter() + .map(|x| x.as_ref()) + .map(|s| { + if let Ok(index) = s.parse::() { + PathElement::Index(index) + } else if s == "@" { + PathElement::Flatten + } else { + PathElement::Key(s.to_string()) + } + }) + .collect(), + ) + } + + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn empty() -> Path { + Path(Default::default()) + } + + pub fn parent(&self) -> Option { + if self.is_empty() { + None + } else { + Some(Path(self.iter().cloned().take(self.len() - 1).collect())) + } + } + + pub fn join(&self, other: impl AsRef) -> Self { + let other = other.as_ref(); + let mut new = Vec::with_capacity(self.len() + other.len()); + new.extend(self.iter().cloned()); + new.extend(other.iter().cloned()); + Path(new) + } +} + +impl AsRef for Path { + fn as_ref(&self) -> &Path { + self + } +} + +impl From for Path +where + T: AsRef, +{ + fn from(s: T) -> Self { + Self( + s.as_ref() + .split('/') + .map(|s| { + if let Ok(index) = s.parse::() { + PathElement::Index(index) + } else if s == "@" { + PathElement::Flatten + } else { + PathElement::Key(s.to_string()) + } + }) + .collect(), + ) + } +} + +impl fmt::Display for Path { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for element in self.iter() { + write!(f, "/")?; + match element { + PathElement::Index(index) => write!(f, "{}", index)?, + PathElement::Key(key) => write!(f, "{}", key)?, + PathElement::Flatten => write!(f, "@")?, + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + macro_rules! assert_is_subset { + ($a:expr, $b:expr $(,)?) => { + assert!($a.is_subset(&$b)); + }; + } + + macro_rules! assert_is_not_subset { + ($a:expr, $b:expr $(,)?) => { + assert!(!$a.is_subset(&$b)); + }; + } + + #[test] + fn test_get_at_path() { + let mut json = json!({"obj":{"arr":[{"prop1":1},{"prop1":2}]}}); + let path = Path::from("obj/arr/1/prop1"); + let result = json.get_at_path(&path).unwrap(); + assert_eq!(result, vec![&Value::Number(2.into())]); + let result_mut = json.get_at_path_mut(&path).unwrap(); + assert_eq!(result_mut, vec![&mut Value::Number(2.into())]); + } + + #[test] + fn test_get_at_path_flatmap() { + let mut json = json!({"obj":{"arr":[{"prop1":1},{"prop1":2}]}}); + let path = Path::from("obj/arr/@"); + let result = json.get_at_path(&path).unwrap(); + assert_eq!(result, vec![&json!({"prop1":1}), &json!({"prop1":2})]); + let result_mut = json.get_at_path_mut(&path).unwrap(); + assert_eq!( + result_mut, + vec![&mut json!({"prop1":1}), &mut json!({"prop1":2})] + ); + } + + #[test] + fn test_get_at_path_flatmap_nested() { + let mut json = json!({ + "obj": { + "arr": [ + { + "prop1": [ + {"prop2": {"prop3": 1}, "prop4": -1}, + {"prop2": {"prop3": 2}, "prop4": -2}, + ], + }, + { + "prop1": [ + {"prop2": {"prop3": 3}, "prop4": -3}, + {"prop2": {"prop3": 4}, "prop4": -4}, + ], + }, + ], + }, + }); + let path = Path::from("obj/arr/@/prop1/@/prop2"); + let result = json.get_at_path(&path).unwrap(); + assert_eq!( + result, + vec![ + &json!({"prop3":1}), + &json!({"prop3":2}), + &json!({"prop3":3}), + &json!({"prop3":4}), + ], + ); + let result_mut = json.get_at_path_mut(&path).unwrap(); + assert_eq!( + result_mut, + vec![ + &mut json!({"prop3":1}), + &mut json!({"prop3":2}), + &mut json!({"prop3":3}), + &mut json!({"prop3":4}), + ], + ); + } + + #[test] + fn test_deep_merge() { + let mut json = json!({"obj":{"arr":[{"prop1":1},{"prop2":2}]}}); + json.deep_merge(&json!({"obj":{"arr":[{"prop1":2,"prop3":3},{"prop4":4}]}})); + assert_eq!( + json, + json!({"obj":{"arr":[{"prop1":2, "prop3":3},{"prop2":2, "prop4":4}]}}) + ); + } + + #[test] + fn test_is_subset_eq() { + assert_is_subset!( + json!({"obj":{"arr":[{"prop1":1},{"prop4":4}]}}), + json!({"obj":{"arr":[{"prop1":1},{"prop4":4}]}}), + ); + } + + #[test] + fn test_is_subset_missing_pop() { + assert_is_subset!( + json!({"obj":{"arr":[{"prop1":1},{"prop4":4}]}}), + json!({"obj":{"arr":[{"prop1":1,"prop3":3},{"prop4":4}]}}), + ); + } + + #[test] + fn test_is_subset_array_lengths_differ() { + assert_is_not_subset!( + json!({"obj":{"arr":[{"prop1":1}]}}), + json!({"obj":{"arr":[{"prop1":1,"prop3":3},{"prop4":4}]}}), + ); + } + + #[test] + fn test_is_subset_extra_prop() { + assert_is_not_subset!( + json!({"obj":{"arr":[{"prop1":1,"prop3":3},{"prop4":4}]}}), + json!({"obj":{"arr":[{"prop1":1},{"prop4":4}]}}), + ); + } +} diff --git a/crates/execution/Cargo.toml b/crates/execution/Cargo.toml index 3f9e1f8bb..3be48120a 100644 --- a/crates/execution/Cargo.toml +++ b/crates/execution/Cargo.toml @@ -8,28 +8,32 @@ license-file = "./LICENSE" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -query-planner = { path = "../query-planner" } +apollo-json-ext = { path = "../apollo-json-ext" } +async-recursion = "0.3" +bytes = "1.1.0" configuration = { path = "../configuration" } +derivative = "2.2.0" +derive_more = "0.99.16" +displaydoc = "0.2" +env_logger = "0.9.0" futures = "0.3.16" -serde_json = "1.0.66" +log = "0.4.14" +mockall = {version = "0.10.1", optional = true} +parking_lot = "0.11.1" +query-planner = { path = "../query-planner" } +reqwest = { version = "0.11.4", features = ["json", "stream"] } serde = { version = "1.0.129", features = ["derive", "rc"] } -derive_more = "0.99.16" +serde_json = "1.0.66" +serde_yaml = "0.8.19" thiserror = "1.0.26" -reqwest = { version = "0.11.4", features = ["json", "stream"] } tokio = { version = "1.10.1", features = ["full"] } -bytes = "1.1.0" -derivative = "2.2.0" -serde_yaml = "0.8.19" -log = "0.4.14" -env_logger = "0.9.0" -parking_lot = "0.11.1" typed-builder = "0.9.0" -mockall = {version = "0.10.1", optional = true} [dev-dependencies] +ctor = "0.1.20" httpmock = "0.6.2" maplit = "1.0.2" -ctor = "0.1.20" +static_assertions = "1" [features] mocks = ["mockall"] diff --git a/crates/execution/src/error.rs b/crates/execution/src/error.rs new file mode 100644 index 000000000..66a2b576a --- /dev/null +++ b/crates/execution/src/error.rs @@ -0,0 +1,137 @@ +use crate::GraphQLResponse; +use apollo_json_ext::prelude::*; +use displaydoc::Display; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +/// Error types for execution. Note that these are not actually returned to the client, but are +/// instead converted to Json for GraphQLError +#[derive(Error, Display, Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +#[ignore_extra_doc_attributes] +pub enum FetchError { + /// Query references unknown service '{service}'. + ValidationUnknownServiceError { + /// The service that was unknown. + service: String, + }, + + /// Query requires variable '{name}', but it was not provided. + ValidationMissingVariable { + /// Name of the variable. + name: String, + }, + + /// Query could not be planned: {reason} + ValidationPlanningError { + /// The failure reason. + reason: String, + }, + + /// Response was malformed: {reason} + MalformedResponse { + /// The reason the serialization failed. + reason: String, + }, + + /// Service '{service}' returned no response. + SubrequestNoResponse { + /// The service that returned no response. + service: String, + }, + + /// Service '{service}' response was malformed: {reason} + SubrequestMalformedResponse { + /// The service that responded with the malformed response. + service: String, + + /// The reason the serialization failed. + reason: String, + }, + + /// Service '{service}' returned a PATCH response which was not expected. + SubrequestUnexpectedPatchResponse { + /// The service that returned the PATCH response. + service: String, + }, + + /// HTTP fetch failed from '{service}': {reason} + /// + /// Note that this relates to a transport error and not a GraphQL error. + SubrequestHttpError { + /// The service failed. + service: String, + + /// The reason the fetch failed. + reason: String, + }, + + /// Subquery requires field '{field}' but it was not found in the current response. + ExecutionFieldNotFound { + /// The field that is not found. + field: String, + }, + + /// Invalid content: {reason} + ExecutionInvalidContent { reason: String }, + + /// Could find path: {reason} + ExecutionPathNotFound { reason: String }, +} + +impl FetchError { + /// Convert the fetch error to a GraphQL error. + pub fn to_graphql_error(&self, path: Option) -> GraphQLError { + GraphQLError { + message: self.to_string(), + locations: Default::default(), + path: path.unwrap_or_default(), + extensions: serde_json::to_value(self) + .unwrap() + .as_object() + .unwrap() + .to_owned(), + } + } + + /// Convert the error to an appropriate response. + pub fn to_response(&self, primary: bool) -> GraphQLResponse { + GraphQLResponse { + label: Default::default(), + data: Default::default(), + path: Default::default(), + has_next: primary.then(|| false), + errors: vec![self.to_graphql_error(None)], + extensions: Default::default(), + } + } +} + +/// {message} +#[derive(Error, Display, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GraphQLError { + /// The error message. + pub message: String, + + /// The locations of the error from the originating request. + pub locations: Vec, + + /// The path of the error. + pub path: Path, + + /// The optional graphql extensions. + #[serde(default, skip_serializing_if = "Object::is_empty")] + pub extensions: Object, +} + +/// A location in the request that triggered a graphql error. +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Location { + /// The line number. + pub line: i32, + + /// The column number. + pub column: i32, +} diff --git a/crates/execution/src/federated.rs b/crates/execution/src/federated.rs index be8b8b50e..927db0511 100644 --- a/crates/execution/src/federated.rs +++ b/crates/execution/src/federated.rs @@ -1,536 +1,25 @@ -use std::collections::HashSet; -use std::pin::Pin; -use std::sync::Arc; - -use futures::lock::Mutex; -use futures::prelude::*; -use serde_json::{Map, Value}; - -use query_planner::model::{FetchNode, FlattenNode, PlanNode, QueryPlan, SelectionSet}; -use query_planner::{QueryPlanOptions, QueryPlanner, QueryPlannerError}; - -use crate::traverser::Traverser; use crate::{ - FetchError, GraphQLFetcher, GraphQLPrimaryResponse, GraphQLRequest, GraphQLResponse, - GraphQLResponseStream, Path, PathElement, ServiceRegistry, + FetchError, GraphQLFetcher, GraphQLRequest, GraphQLResponse, GraphQLResponseStream, + ServiceRegistry, }; -use futures::{FutureExt, StreamExt}; - -type TraverserStream = Pin + Send>>; -type EmptyFuture = Pin + Send>>; - -/// Federated graph fetcher creates a query plan and executes the plan against one or more -/// subgraphs. For information on how the algorithm works refer to the README for this crate. -#[derive(Clone, Debug)] -pub struct FederatedGraph { - query_planner: Arc>, - service_registry: Arc, - concurrency_factor: usize, - chunk_size: usize, -} - -impl FederatedGraph { - /// Create a new federated graph fetcher. - /// query_planner is shared between threads and requires a lock for planning: - /// 1. query planners may be mutable for caching state. - /// 2. we can clone FederatedGraph for use across threads so we can make use of syntax. - /// - /// service_registry is shared between threads, but is send and sync and therefore does not need - /// a mutex. - /// - /// concurrency_factor and chunk_size are not exposed right now. Setting chunk_size to 1 has - /// the effect of serializing the execution in a predictable order, which can be useful for - /// debugging. - /// - /// In future we may allow concurrency_factor and chunk_size to be set explicitly to allow - /// clients to avoid stalled execution at the cost of making more downstream calls. - /// - /// # Arguments - /// - /// * `query_planner`: The query planner to use to for planning. - /// * `service_registry`: The registry of service name to fetcher. - /// - /// returns: FederatedGraph - /// - pub fn new(query_planner: T, service_registry: Arc) -> Self - where - T: QueryPlanner + 'static, - { - Self { - concurrency_factor: 100000, - chunk_size: 100000, - query_planner: Arc::new(Mutex::new(query_planner)), - service_registry, - } - } - - /// Create a query plan via the query planner - /// - /// # Arguments - /// - /// * `request`: The request to be planned. - /// - /// returns: Result - /// - async fn plan(self, request: Arc) -> Result { - let mut query_planner = self.query_planner.lock().await; - let query_plan = query_planner.get( - request.query.to_owned(), - request.operation_name.to_owned(), - QueryPlanOptions::default(), - )?; - - Ok(query_plan) - } - - /// Visit a stream of traversers with a plan node. - /// Dispatches the visit to the fetch, sequence, parallel, or flatten operations. - /// - /// # Arguments - /// - /// * `traversers`: The stream of traversers to process. - /// * `node`: The query plan node. - /// * `request`: The GraphQL original request. - /// - /// returns Pin + Send>> - fn visit( - self, - traversers: TraverserStream, - node: PlanNode, - request: Arc, - ) -> EmptyFuture { - let concurrency_factor = self.concurrency_factor; - - let variables = match node { - PlanNode::Fetch(ref fetch) if fetch.requires.is_none() => Arc::new( - fetch - .variable_usages - .iter() - .filter_map(|key| { - request - .variables - .get(key) - .map(|value| (key.to_owned(), value.to_owned())) - }) - .collect::>(), - ), - _ => Default::default(), - }; - - traversers - .chunks(self.chunk_size) - .map(move |traversers| { - let traverser_stream = stream::iter(traversers).boxed(); - let clone = self.to_owned(); - match node.to_owned() { - PlanNode::Sequence { nodes } => { - clone.visit_sequence(traverser_stream, nodes, request.clone()) - } - PlanNode::Parallel { nodes } => { - clone.visit_parallel(traverser_stream, nodes, request.clone()) - } - PlanNode::Fetch(fetch) if fetch.requires.is_none() => { - clone.visit_fetch_no_select(traverser_stream, fetch, variables.clone()) - } - PlanNode::Fetch(fetch) => { - clone.visit_fetch_select(traverser_stream, fetch, request.clone()) - } - PlanNode::Flatten(flatten) => { - clone.visit_flatten(traverser_stream, flatten, request.clone()) - } - } - .boxed() - }) - .buffer_unordered(concurrency_factor) - .for_each(|_| future::ready(())) - .boxed() - } - - /// Fetch where the plan node has a selection. - /// Selection fetches are performed in bulk and the results are merged back into the originating - /// traverser. - /// - /// For each traverser we try and obtain data from the content that matches the selection. - /// Any traversers that do not match anything are dropped. - /// - /// The selections are aggregated and sent to the downstream service, the result is merged back - /// in with the originating traverser. - /// - /// # Arguments - /// - /// * `traversers`: The stream of traversers to process. - /// * `fetch`: The fetch plan node. - /// - /// returns Pin + Send>> - /// - fn visit_fetch_select( - self, - traversers: TraverserStream, - fetch: FetchNode, - request: Arc, - ) -> EmptyFuture { - traversers - .collect::>() - .map(move |traversers| { - let service_name = fetch.service_name.to_owned(); - // We already checked that the service exists during planning - let fetcher = self.service_registry.get(&service_name).unwrap(); - let (mut traversers, selections) = - traversers_with_selections(&fetch.requires, traversers); - - let mut variables = Map::with_capacity(1 + fetch.variable_usages.len()); - variables.extend(fetch.variable_usages.iter().filter_map(|key| { - request - .variables - .get(key) - .map(|value| (key.to_owned(), value.to_owned())) - })); - variables.insert( - "representations".into(), - construct_representations(selections), - ); - - fetcher - .stream( - GraphQLRequest::builder() - .query(fetch.operation) - .variables(variables) - .build(), - ) - .into_future() - .map(move |(primary, _rest)| match primary { - // If we got results we zip the stream up with the original traverser and merge the results. - Some(GraphQLResponse::Primary(primary)) => { - merge_response(&mut traversers, primary); - } - Some(GraphQLResponse::Patch(_)) => { - traversers.iter_mut().for_each(|t| { - t.add_error(&FetchError::SubrequestMalformedResponse { - service: service_name.to_owned(), - reason: "Subrequest sent patch response as primary".to_string(), - }) - }); - } - _ => { - traversers.iter_mut().for_each(|t| { - t.add_error(&FetchError::SubrequestNoResponse { - service: service_name.to_owned(), - }) - }); - } - }) - .boxed() - }) - .flatten() - .boxed() - } - - /// Perform a fetch with no selections. - /// Without selections the queries for each traverser must be made independently and cannot be - /// batched. - /// - /// In practice non selection queries are likely to happen only at the top level of a query plan - /// and will therefore only have one traverser. - /// - /// If a non-selection query does happen at a lower level with multiple traversers the requests - /// happen in parallel. - /// - /// # Arguments - /// - /// * `traversers`: The traversers to process. - /// * `fetch`: The fetch node. - /// - /// returns Pin + Send>> - /// - fn visit_fetch_no_select( - self, - traversers: TraverserStream, - fetch: FetchNode, - variables: Arc>, - ) -> EmptyFuture { - let concurrency_factor = self.concurrency_factor; - traversers - .map(move |mut traverser| { - let service_name = fetch.service_name.to_owned(); - // We already validated that the service exists during planning - let fetcher = self.service_registry.get(&service_name).unwrap(); - - fetcher - .stream( - GraphQLRequest::builder() - .query(fetch.operation.clone()) - .variables(variables.clone()) - .build(), - ) - .into_future() - .map(move |(primary, _rest)| match primary { - Some(GraphQLResponse::Primary(primary)) => { - traverser.merge(Some(&Value::Object(primary.data))); - } - Some(GraphQLResponse::Patch(_)) => { - panic!("Should not have had patch response as primary!") - } - None => traverser.add_error(&FetchError::SubrequestNoResponse { - service: service_name, - }), - }) - .boxed() - }) - .buffered(concurrency_factor) - .for_each(|_| future::ready(())) - .boxed() - } - - /// Visit a sequence of plan nodes in turn. - /// Execution waits for the previous operations to complete before executing the next operation - /// in the query plan. - /// - /// # Arguments - /// - /// * `traversers`: The stream of traversers to process. - /// * `nodes`: The plan nodes in the sequence. - /// - /// returns Pin + Send>> - fn visit_sequence( - self, - traversers: TraverserStream, - nodes: Vec, - request: Arc, - ) -> EmptyFuture { - traversers - .collect::>() - .map(move |traversers| { - // We now have a chunk of traversers - nodes - .iter() - .fold(future::ready(()).boxed(), |acc, node| { - let next = self - .to_owned() - .visit( - stream::iter(traversers.to_owned()).boxed(), - node.to_owned(), - request.clone(), - ) - .boxed(); - - acc.then(|_| next).boxed() - }) - .boxed() - }) - .flatten() - .boxed() - } - - /// Visit a set of plan nodes in parallel. - /// Execution of all child operations happens in parallel, however the parallel operation cannot - /// complete until all child operations have completed. - /// - /// With large chunk sizes there is the potential that a stalled operation will stall the - /// entire pipeline. - /// - /// # Arguments - /// - /// * `traversers`: The stream of traversers to process. - /// * `nodes`: The pan nodes to execute in parallel. - /// - /// returns Pin + Send>> - fn visit_parallel( - self, - traversers: TraverserStream, - nodes: Vec, - request: Arc, - ) -> EmptyFuture { - traversers - .collect::>() - .map(move |traversers| { - // We now have a chunk of traversers - // For each parallel branch we send clones of those traversers through the pipeline - let tasks = nodes - .iter() - .map(move |node| { - self.to_owned().visit( - stream::iter(traversers.to_owned()).boxed(), - node.to_owned(), - request.clone(), - ) - }) - .collect::>(); - future::join_all(tasks).map(|_| ()) - }) - .flatten() - .boxed() - } - - /// Visit a flatten plan node. - /// Given a traverser this will create a stream of traversers that match the path provided in - /// the plan. - /// - /// For instance given: - /// ```json - /// { - /// 'a': { - /// 'b':[{'c':1}, {'c':2}] - /// } - /// ``` - /// a traverser at path `a` - /// and a plan path of `b/@/c` - /// The traversers generated will be: - /// `a/b/0/c' and `a/b/1/c' - /// - /// # Arguments - /// - /// * `traversers`: The stream of traversers to process. - /// * `flatten`: The flatten plan node. - /// - /// returns Pin + Send>> - /// - fn visit_flatten( - &self, - traversers: TraverserStream, - flatten: FlattenNode, - request: Arc, - ) -> EmptyFuture { - let path = Path::parse(flatten.path.join("/")); - let expanded = traversers - .flat_map(move |traverser| traverser.stream_descendants(&path)) - .boxed(); - self.to_owned().visit(expanded, *flatten.node, request) - } -} - -impl GraphQLFetcher for FederatedGraph { - fn stream(&self, request: GraphQLRequest) -> GraphQLResponseStream { - let request = Arc::new(request); - let clone = self.clone(); - - self.clone() - .plan(request.clone()) - .map(move |plan| match plan { - Ok(QueryPlan { node: Some(root) }) => { - let mut start = Traverser::new(request.clone()); - - start.add_errors(&validate_services_against_plan( - clone.service_registry.to_owned(), - &root, - )); - start.add_errors(&validate_request_variables_against_plan( - request.to_owned(), - &root, - )); - - // If we have any errors so far then let's abort the query - // Planning/validation/variables are candidates to abort. - if start.has_errors() { - return stream::iter(vec![start.to_primary()]).boxed(); - } - - clone - .visit(stream::iter(vec![start.to_owned()]).boxed(), root, request) - .map(move |_| stream::iter(vec![start.to_primary()])) - .flatten_stream() - .boxed() - } - Ok(_) => stream::empty().boxed(), - Err(err) => stream::iter(vec![err.to_primary()]).boxed(), - }) - .into_stream() - .flatten() - .boxed() - } -} - -impl From for FetchError { - fn from(err: QueryPlannerError) -> Self { - FetchError::ValidationPlanningError { - reason: err.to_string(), - } - } -} - -/// Given a vec of selections merge them into an array value. -/// -/// # Arguments -/// -/// * `selections`: The selections to merge. -/// -/// returns: Value -/// -fn construct_representations(selections: Vec) -> Value { - Value::Array(selections.iter().map(|value| value.to_owned()).collect()) -} - -/// Get the list of traversers and corresponding selections for sending to a downstream service. -/// Any traverser that does not result in a selection will be dropped. -/// -/// # Arguments -/// -/// * `fetch`: The fetch node that defines the -/// * `traversers`: The vec of traversers to process. -/// -/// returns: (Vec, Vec) -/// -fn traversers_with_selections( - requires: &Option, - mut traversers: Vec, -) -> (Vec, Vec) { - traversers - .iter_mut() - .map(|traverser| (traverser.to_owned(), traverser.select(requires))) - .filter_map(|(mut traverser, selection)| match selection { - Ok(Some(x)) => Some((traverser, x)), - Ok(None) => None, - Err(err) => { - traverser.add_error(&err); - None - } - }) - .unzip() -} - -/// Merge the results of a selection query with the originating traverser. -/// Each result is paired with the originating traverser before merging. -/// -/// # Arguments -/// -/// * `traversers`: The vec of traversers to merge with. -/// * `primary`: The response from the downstream server -/// -/// returns: Vec -/// -fn merge_response(traversers: &mut [Traverser], primary: GraphQLPrimaryResponse) { - if let Some(Value::Array(array)) = primary.data.get("_entities") { - traversers - .iter() - .zip(array.iter()) - .for_each(|(traverser, result)| { - traverser.to_owned().merge(Some(result)); - }); - } - //We may have some errors that relate to entities. Find them and add them to the appropriate - //traverser - for mut err in primary.errors { - if err.path[0].eq(&PathElement::Key("_entities".to_string())) { - if let PathElement::Index(index) = err.path[1] { - err.path.splice(0..2, vec![]); - traversers[index].add_graphql_error(&err); - } - } else { - log::error!("Subquery had errors that did not map to entities."); - } - } -} +use apollo_json_ext::prelude::*; +use async_recursion::async_recursion; +use futures::lock::Mutex; +use futures::prelude::*; +use query_planner::model::{FetchNode, FlattenNode, PlanNode, QueryPlan}; +use query_planner::QueryPlanner; +use std::collections::HashSet; +use std::sync::Arc; /// Recursively validate a query plan node making sure that all services are known before we go /// for execution. +/// /// This simplifies processing later as we can always guarantee that services are configured for -/// the plan. +/// the plan. /// /// # Arguments /// -/// * `plan`: The root query plan node to validate. -/// -/// returns: Result<(), FetchError> -/// +/// * `plan`: The root query plan node to validate. fn validate_services_against_plan( service_registry: Arc, plan: &PlanNode, @@ -545,6 +34,15 @@ fn validate_services_against_plan( .collect::>() } +/// Recursively validate a query plan node making sure that all variable usages are known before we +/// go for execution. +/// +/// This simplifies processing later as we can always guarantee that the variable usages are +/// available for the plan. +/// +/// # Arguments +/// +/// * `plan`: The root query plan node to validate. fn validate_request_variables_against_plan( request: Arc, plan: &PlanNode, @@ -563,24 +61,295 @@ fn validate_request_variables_against_plan( .collect::>() } -#[cfg(test)] -mod tests { - use std::collections::hash_map::Entry; - use std::collections::HashMap; +pub struct FederatedGraph { + query_planner: Box, + service_registry: Arc, +} - use futures::prelude::*; - use maplit::hashmap; - use serde_json::json; - use serde_json::to_string_pretty; +impl FederatedGraph { + /// Create a `FederatedGraph` instance used to execute a GraphQL query. + pub fn new( + query_planner: Box, + service_registry: Arc, + ) -> Self { + Self { + query_planner, + service_registry, + } + } +} - use configuration::Configuration; - use query_planner::harmonizer::HarmonizerQueryPlanner; +impl GraphQLFetcher for FederatedGraph { + fn stream(&self, request: GraphQLRequest) -> GraphQLResponseStream { + let plan = match self.query_planner.get( + request.query.to_owned(), + request.operation_name.to_owned(), + Default::default(), + ) { + Ok(QueryPlan { node: Some(root) }) => root, + Ok(_) => return stream::empty().boxed(), + Err(err) => return stream::iter(vec![FetchError::from(err).to_response(true)]).boxed(), + }; + let service_registry = self.service_registry.clone(); + let request = Arc::new(request); - use crate::http_service_registry::HttpServiceRegistry; - use crate::http_subgraph::HttpSubgraphFetcher; - use crate::json_utils::is_subset; + let mut early_errors = Vec::new(); + + for err in validate_services_against_plan(service_registry.clone(), &plan) { + early_errors.push(err.to_graphql_error(None)); + } + + for err in validate_request_variables_against_plan(request.clone(), &plan) { + early_errors.push(err.to_graphql_error(None)); + } + + // If we have any errors so far then let's abort the query + // Planning/validation/variables are candidates to abort. + if !early_errors.is_empty() { + return stream::once( + async move { GraphQLResponse::builder().errors(early_errors).build() }, + ) + .boxed(); + } + + stream::once(async move { + let response = Arc::new(Mutex::new(GraphQLResponse::builder().build())); + let root = Path::empty(); + + execute( + response.clone(), + &root, + &plan, + request, + service_registry.clone(), + ) + .await; + + // TODO: this is not great but there is no other way + Arc::try_unwrap(response) + .expect("todo: how to prove?") + .into_inner() + }) + .boxed() + } +} +#[async_recursion] +async fn execute( + response: Arc>, + current_dir: &Path, + plan: &PlanNode, + request: Arc, + service_registry: Arc, +) { + log::trace!("Executing plan:\n{:#?}", plan); + + match plan { + PlanNode::Sequence { nodes } => { + for node in nodes { + execute( + response.clone(), + current_dir, + node, + request.clone(), + service_registry.clone(), + ) + .await; + } + } + PlanNode::Parallel { nodes } => { + future::join_all(nodes.iter().map(|plan| { + execute( + response.clone(), + current_dir, + plan, + request.clone(), + service_registry.clone(), + ) + })) + .await; + } + PlanNode::Fetch(info) => { + match fetch_node( + response.clone(), + ¤t_dir, + info, + request.clone(), + service_registry.clone(), + ) + .await + { + Ok(()) => { + log::trace!( + "New data:\n{}", + serde_json::to_string_pretty(&response.lock().await.data).unwrap(), + ); + } + Err(err) => { + debug_assert!(false, "{}", err); + log::error!("Fetch error: {}", err); + response + .lock() + .await + .errors + .push(err.to_graphql_error(Some(current_dir.to_owned()))); + } + } + } + PlanNode::Flatten(FlattenNode { path, node }) => { + // this is the only command that actually changes the "current dir" + let current_dir = current_dir.join(path); + execute( + response.clone(), + // a path can go over multiple json node! + ¤t_dir, + node, + request.clone(), + service_registry.clone(), + ) + .await; + } + } +} + +async fn fetch_node<'a>( + response: Arc>, + current_dir: &'a Path, + FetchNode { + variable_usages, + requires, + operation, + service_name, + }: &'a FetchNode, + request: Arc, + service_registry: Arc, +) -> Result<(), FetchError> { + if let Some(requires) = requires { + // We already checked that the service exists during planning + let fetcher = service_registry.get(service_name).unwrap(); + + let mut variables = Object::with_capacity(1 + variable_usages.len()); + variables.extend(variable_usages.iter().filter_map(|key| { + request + .variables + .get(key) + .map(|value| (key.to_owned(), value.to_owned())) + })); + + { + let response = response.lock().await; + log::trace!( + "Creating representations at path '{}' for selections={:?} using data={}", + current_dir, + requires, + serde_json::to_string(&response.data).unwrap(), + ); + let representations = response.select(current_dir, requires)?; + variables.insert("representations".into(), representations); + } + + let (res, _tail) = fetcher + .stream( + GraphQLRequest::builder() + .query(operation) + .variables(variables) + .build(), + ) + .into_future() + .await; + + match res { + Some(response) if !response.is_primary() => { + Err(FetchError::SubrequestUnexpectedPatchResponse { + service: service_name.to_owned(), + }) + } + Some(GraphQLResponse { data, .. }) => { + if let Some(entities) = data.get("_entities") { + log::trace!( + "Received entities: {}", + serde_json::to_string(entities).unwrap(), + ); + if let Some(array) = entities.as_array() { + let mut response = response.lock().await; + + for (i, entity) in array.iter().enumerate() { + response.insert_data( + ¤t_dir.join(Path::from(i.to_string())), + entity.to_owned(), + )?; + } + + Ok(()) + } else { + Err(FetchError::ExecutionInvalidContent { + reason: "Received invalid type for key `_entities`!".to_string(), + }) + } + } else { + Err(FetchError::ExecutionInvalidContent { + reason: "Missing key `_entities`!".to_string(), + }) + } + } + None => Err(FetchError::SubrequestNoResponse { + service: service_name.to_string(), + }), + } + } else { + let variables = Arc::new( + variable_usages + .iter() + .filter_map(|key| { + request + .variables + .get(key) + .map(|value| (key.to_owned(), value.to_owned())) + }) + .collect::(), + ); + + // We already validated that the service exists during planning + let fetcher = service_registry.get(service_name).unwrap(); + + let (res, _tail) = fetcher + .stream( + GraphQLRequest::builder() + .query(operation.clone()) + .variables(variables.clone()) + .build(), + ) + .into_future() + .await; + + match res { + Some(response) if !response.is_primary() => { + Err(FetchError::SubrequestUnexpectedPatchResponse { + service: service_name.to_owned(), + }) + } + Some(GraphQLResponse { data, .. }) => { + response.lock().await.insert_data(¤t_dir, data)?; + Ok(()) + } + None => Err(FetchError::SubrequestNoResponse { + service: service_name.to_string(), + }), + } + } +} + +#[cfg(test)] +mod tests { use super::*; + use crate::http_service_registry::HttpServiceRegistry; + use crate::http_subgraph::HttpSubgraphFetcher; + use configuration::Configuration; + use maplit::hashmap; + use query_planner::harmonizer::HarmonizerQueryPlanner; + use serde_json::to_string_pretty; + use std::collections::hash_map::Entry; + use std::collections::HashMap; #[ctor::ctor] fn init() { @@ -589,6 +358,7 @@ mod tests { macro_rules! assert_federated_response { ($query:expr, $service_requests:expr $(,)?) => { + init(); let request = GraphQLRequest::builder() .query($query) .variables(Arc::new( @@ -600,77 +370,22 @@ mod tests { .collect(), )) .build(); - let mut expected = query_node(request.clone()); let (mut actual, registry) = query_rust(request.clone()); + let mut expected = query_node(request.clone()); - let actual = actual.next().await.unwrap().primary(); - let expected = expected.next().await.unwrap().primary(); - log::debug!("{}", to_string_pretty(&actual).unwrap()); - log::debug!("{}", to_string_pretty(&expected).unwrap()); + let actual = actual.next().await.unwrap(); + let expected = expected.next().await.unwrap(); + eprintln!("actual: {}", to_string_pretty(&actual).unwrap()); + eprintln!("expected: {}", to_string_pretty(&expected).unwrap()); // The current implementation does not cull extra properties that should not make is to the // output yet, so we check that the nodejs implementation returns a subset of the // output of the rust output. - assert!(is_subset( - &Value::Object(expected.data), - &Value::Object(actual.data) - )); + assert!(expected.data.is_subset(&actual.data)); assert_eq!(registry.totals(), $service_requests); }; } - #[tokio::test] - async fn test_merge_response() { - let mut traverser = Traverser::new(Arc::new(GraphQLRequest::builder().query("").build())); - traverser.merge(Some(&json!({"arr":[{}, {}]}))); - - let mut children = traverser - .stream_descendants(&Path::parse("arr/@")) - .collect::>() - .await; - merge_response( - &mut children, - GraphQLPrimaryResponse { - data: json!({ - "_entities": [ - {"prop1": "val1"}, - {"prop1": "val2"}, - ] - }) - .as_object() - .unwrap() - .to_owned(), - has_next: false, - errors: vec![FetchError::MalformedResponse { - reason: "Something".to_string(), - } - .to_graphql_error(Some(Path::parse("_entities/1")))], - extensions: Default::default(), - }, - ); - - assert_eq!( - traverser.to_primary().primary(), - GraphQLPrimaryResponse { - data: json!({ - "arr": [ - {"prop1": "val1"}, - {"prop1": "val2"}, - ], - }) - .as_object() - .unwrap() - .to_owned(), - has_next: false, - errors: vec![FetchError::MalformedResponse { - reason: "Something".to_string(), - } - .to_graphql_error(Some(Path::parse("arr/1")))], - extensions: Default::default(), - } - ); - } - #[tokio::test] async fn basic_request() { assert_federated_response!( @@ -718,7 +433,6 @@ mod tests { #[tokio::test] async fn variables() { - init(); assert_federated_response!( r#" query ExampleQuery($topProductsFirst: Int, $reviewsForAuthorAuthorId: ID!) { @@ -760,7 +474,7 @@ mod tests { .build(); let (response, _) = query_rust(request.clone()); let data = response - .flat_map(|x| stream::iter(x.primary().errors)) + .flat_map(|x| stream::iter(x.errors)) .collect::>() .await; let expected = vec![ @@ -793,7 +507,7 @@ mod tests { let registry = Arc::new(CountingServiceRegistry::new(HttpServiceRegistry::new( &config, ))); - let federated = FederatedGraph::new(planner, registry.to_owned()); + let federated = FederatedGraph::new(Box::new(planner), registry.clone()); (federated.stream(request), registry) } diff --git a/crates/execution/src/http_service_registry.rs b/crates/execution/src/http_service_registry.rs index b3427daed..0073681fc 100644 --- a/crates/execution/src/http_service_registry.rs +++ b/crates/execution/src/http_service_registry.rs @@ -1,16 +1,24 @@ -use std::collections::HashMap; - -use configuration::Configuration; - use crate::http_subgraph::HttpSubgraphFetcher; use crate::{GraphQLFetcher, ServiceRegistry}; +use configuration::Configuration; +use std::collections::HashMap; +use std::fmt; /// Service registry that uses http to connect to subgraphs. -#[derive(Debug)] pub struct HttpServiceRegistry { services: HashMap>, } +impl fmt::Debug for HttpServiceRegistry { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut debug = f.debug_tuple("HttpServiceRegistry"); + for name in self.services.keys() { + debug.field(name); + } + debug.finish() + } +} + impl HttpServiceRegistry { /// Create a new http service registry from a configuration. pub fn new(configuration: &Configuration) -> Self { diff --git a/crates/execution/src/http_subgraph.rs b/crates/execution/src/http_subgraph.rs index e85f093e7..418a0e9ee 100644 --- a/crates/execution/src/http_subgraph.rs +++ b/crates/execution/src/http_subgraph.rs @@ -3,10 +3,7 @@ use std::pin::Pin; use bytes::Bytes; use futures::prelude::*; -use crate::{ - FetchError, GraphQLFetcher, GraphQLPatchResponse, GraphQLPrimaryResponse, GraphQLRequest, - GraphQLResponse, GraphQLResponseStream, -}; +use crate::{FetchError, GraphQLFetcher, GraphQLRequest, GraphQLResponse, GraphQLResponseStream}; type BytesStream = Pin> + std::marker::Send>>; @@ -48,7 +45,6 @@ impl HttpSubgraphFetcher { Err(err) => stream::iter(vec![Err(err)]).boxed(), }) .map_err(move |err: reqwest::Error| FetchError::SubrequestHttpError { - status: err.status().map(|status| status.into()), service: service.to_owned(), reason: err.to_string(), }) @@ -64,8 +60,7 @@ impl HttpSubgraphFetcher { .map(move |(index, result)| { let service = service.to_owned(); match result { - Ok(bytes) if { index == 0 } => to_primary(service, &bytes), - Ok(bytes) => to_patch(service, &bytes), + Ok(bytes) => to_response(service, &bytes, index == 0), Err(err) => err.to_response(index == 0), } }); @@ -73,24 +68,13 @@ impl HttpSubgraphFetcher { } } -fn to_patch(service: impl Into, bytes: &Bytes) -> GraphQLResponse { - serde_json::from_slice::(&bytes) - .map(GraphQLResponse::Patch) +fn to_response(service: impl Into, bytes: &Bytes, primary: bool) -> GraphQLResponse { + serde_json::from_slice::(&bytes) .map_err(move |err| FetchError::SubrequestMalformedResponse { service: service.into(), reason: err.to_string(), }) - .unwrap_or_else(|err| err.to_patch()) -} - -fn to_primary(service: impl Into, bytes: &Bytes) -> GraphQLResponse { - serde_json::from_slice::(&bytes) - .map(GraphQLResponse::Primary) - .map_err(move |err| FetchError::SubrequestMalformedResponse { - service: service.into(), - reason: err.to_string(), - }) - .unwrap_or_else(|err| err.to_primary()) + .unwrap_or_else(|err| err.to_response(primary)) } impl GraphQLFetcher for HttpSubgraphFetcher { @@ -111,8 +95,8 @@ mod tests { #[tokio::test] async fn test_non_chunked() -> Result<(), Box> { - let response = GraphQLPrimaryResponse { - data: json!({ + let response = GraphQLResponse::builder() + .data(json!({ "allProducts": [ { "variation": { @@ -127,14 +111,8 @@ mod tests { "id": "apollo-studio" } ] - }) - .as_object() - .unwrap() - .to_owned(), - has_next: Default::default(), - errors: Default::default(), - extensions: Default::default(), - }; + })) + .build(); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -155,7 +133,7 @@ mod tests { .collect::>() .await; - assert_eq!(collect[0], GraphQLResponse::Primary(response)); + assert_eq!(collect[0], response); mock.assert(); Ok(()) } diff --git a/crates/execution/src/json_utils.rs b/crates/execution/src/json_utils.rs deleted file mode 100644 index 2e2a01c88..000000000 --- a/crates/execution/src/json_utils.rs +++ /dev/null @@ -1,173 +0,0 @@ -use serde_json::map::Entry; -use serde_json::Value; - -use crate::{Path, PathElement}; - -pub(crate) fn deep_merge(a: &mut Value, b: &Value) { - match (a, b) { - (Value::Object(a), Value::Object(b)) => { - for (key, value) in b.iter() { - match a.entry(key) { - Entry::Vacant(e) => { - e.insert(value.to_owned()); - } - Entry::Occupied(e) => { - deep_merge(e.into_mut(), value); - } - } - } - } - (Value::Array(a), Value::Array(b)) => { - for (index, value) in a.iter_mut().enumerate() { - if let Some(b) = b.get(index) { - deep_merge(value, b); - } - } - } - (_, _) => {} - } -} - -#[allow(unused)] -pub(crate) fn is_subset(subset: &Value, superset: &Value) -> bool { - match (subset, superset) { - (Value::Object(subset), Value::Object(superset)) => subset.iter().all(|(key, value)| { - if let Some(other) = superset.get(key) { - is_subset(value, other) - } else { - false - } - }), - (Value::Array(subset), Value::Array(superset)) => { - subset.len() == superset.len() - && subset.iter().enumerate().all(|(index, value)| { - if let Some(other) = superset.get(index) { - is_subset(value, other) - } else { - false - } - }) - } - (Value::String(subset), Value::String(superset)) => subset == superset, - (Value::Number(subset), Value::Number(superset)) => subset == superset, - (Value::Bool(subset), Value::Bool(superset)) => subset == superset, - - (Value::Null, Value::Null) => true, - (_, _) => false, - } -} - -pub(crate) trait JsonUtils { - /// Get a reference to the value at a particular path. - /// Note that a flatmap path element will return an array if that is the value at that path. - /// It does not actually do any flatmapping, which is instead handled by Traverser::stream. - fn get_at_path(&self, path: &Path) -> Option<&Value>; - - /// Get a reference to the value at a particular path. - /// Note that a flatmap path element will return an array if that is the value at that path. - /// It does not actually do any flatmapping, which is instead handled by Traverser::stream. - fn get_at_path_mut(&mut self, path: &Path) -> Option<&mut Value>; -} -impl JsonUtils for Option { - fn get_at_path(&self, path: &Path) -> Option<&Value> { - let mut current = self.as_ref(); - for path_element in &path.path { - current = match (path_element, current) { - (PathElement::Index(index), Some(Value::Array(array))) => array.get(*index), - (PathElement::Key(key), Some(Value::Object(object))) => object.get(key.as_str()), - (PathElement::Flatmap, Some(array)) if { array.is_array() } => Some(array), - (_, _) => None, - } - } - current - } - - fn get_at_path_mut(&mut self, path: &Path) -> Option<&mut Value> { - let mut current = self.as_mut(); - for path_element in &path.path { - current = match (path_element, current) { - (PathElement::Index(index), Some(Value::Array(array))) => array.get_mut(*index), - (PathElement::Key(key), Some(Value::Object(object))) => { - object.get_mut(key.as_str()) - } - (PathElement::Flatmap, Some(array)) if { array.is_array() } => Some(array), - (_, _) => None, - } - } - current - } -} - -#[cfg(test)] -mod tests { - use serde_json::json; - use serde_json::Value; - - use crate::json_utils::{deep_merge, is_subset, JsonUtils}; - use crate::Path; - - #[test] - fn test_get_at_path() { - let mut json = Some(json!({"obj":{"arr":[{"prop1":1},{"prop1":2}]}})); - let path = Path::parse("obj/arr/1/prop1"); - let result = json.get_at_path(&path); - assert_eq!(result, Some(&Value::Number(2.into()))); - let result_mut = json.get_at_path_mut(&path); - assert_eq!(result_mut, Some(&mut Value::Number(2.into()))); - } - - #[test] - fn test_get_at_path_flatmap() { - let mut json = Some(json!({"obj":{"arr":[{"prop1":1},{"prop1":2}]}})); - let path = Path::parse("obj/arr/@"); - let result = json.get_at_path(&path); - assert_eq!(result, Some(&json!([{"prop1":1},{"prop1":2}]))); - let result_mut = json.get_at_path_mut(&path); - assert_eq!(result_mut, Some(&mut json!([{"prop1":1},{"prop1":2}]))); - } - - #[test] - fn test_deep_merge() { - let mut json = &mut json!({"obj":{"arr":[{"prop1":1},{"prop2":2}]}}); - deep_merge( - &mut json, - &json!({"obj":{"arr":[{"prop1":1,"prop3":3},{"prop4":4}]}}), - ); - assert_eq!( - json, - &mut json!({"obj":{"arr":[{"prop1":1, "prop3":3},{"prop2":2, "prop4":4}]}}) - ); - } - - #[test] - fn test_is_subset_eq() { - assert!(is_subset( - &json!({"obj":{"arr":[{"prop1":1},{"prop4":4}]}}), - &json!({"obj":{"arr":[{"prop1":1},{"prop4":4}]}}), - )); - } - - #[test] - fn test_is_subset_missing_pop() { - assert!(is_subset( - &json!({"obj":{"arr":[{"prop1":1},{"prop4":4}]}}), - &json!({"obj":{"arr":[{"prop1":1,"prop3":3},{"prop4":4}]}}) - )); - } - - #[test] - fn test_is_subset_array_lengths_differ() { - assert!(!is_subset( - &json!({"obj":{"arr":[{"prop1":1}]}}), - &json!({"obj":{"arr":[{"prop1":1,"prop3":3},{"prop4":4}]}}) - )); - } - - #[test] - fn test_is_subset_extra_prop() { - assert!(!is_subset( - &json!({"obj":{"arr":[{"prop1":1,"prop3":3},{"prop4":4}]}}), - &json!({"obj":{"arr":[{"prop1":1},{"prop4":4}]}}) - )); - } -} diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index 0b43b101e..4d684bc78 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -1,260 +1,31 @@ //! Constructs an execution stream from q query plan -use std::fmt::{Debug, Display, Formatter}; -use std::pin::Pin; -use std::sync::Arc; - +use apollo_json_ext::prelude::*; use futures::prelude::*; -use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value}; -use std::ops::{Deref, DerefMut}; -use thiserror::Error; -use typed_builder::TypedBuilder; - #[cfg(feature = "mocks")] use mockall::{automock, predicate::*}; +use query_planner::QueryPlannerError; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::Arc; +use typed_builder::TypedBuilder; +mod error; /// Federated graph fetcher. pub mod federated; - /// Service registry that uses http_subgraph pub mod http_service_registry; - /// Subgraph fetcher that uses http. pub mod http_subgraph; +mod response; -mod json_utils; -/// Execution context code -mod traverser; - -/// A json object -pub type Object = Map; - -/// Extensions is an untyped map that can be used to pass extra data to requests and from responses. -pub type Extensions = Object; - -/// A list of graphql errors. -pub type Errors = Vec; +pub use error::*; +pub use response::*; /// A graph response stream consists of one primary response and any number of patch responses. pub type GraphQLResponseStream = Pin + Send>>; -/// Error types for execution. Note that these are not actually returned to the client, but are -/// instead converted to Json for GraphQLError -#[derive(Error, Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum FetchError { - /// The query plan referenced a service that has not been configured. - #[error("Query references unknown service '{service}'")] - ValidationUnknownServiceError { - /// The service that was unknown. - service: String, - }, - - /// The variable is missing. - #[error("Query requires variable '{name}', but it was not provided")] - ValidationMissingVariable { - /// Name of the variable. - name: String, - }, - - /// The request could not be planned. - #[error("Query could not be planned")] - ValidationPlanningError { - /// The failure reason. - reason: String, - }, - - /// An error when serializing the response. - #[error("Response was malformed")] - MalformedResponse { - /// The reason the serialization failed. - reason: String, - }, - - /// An error when fetching from a service. - #[error("Service '{service}' returned no response")] - SubrequestNoResponse { - /// The service that returned no response. - service: String, - }, - - /// An error when serializing a subquery response. - #[error("Service '{service}' response was malformed")] - SubrequestMalformedResponse { - /// The service that responded with the malformed response. - service: String, - - /// The reason the serialization failed. - reason: String, - }, - - /// An http error when fetching from a service. - /// Note that this relates to a transport error and not a GraphQL error. - #[error("Http fetch failed from: '{service}'")] - SubrequestHttpError { - /// The http error code. - status: Option, - - /// The service failed. - service: String, - - /// The reason the fetch failed. - reason: String, - }, - - /// Field not found in response. - #[error("Subquery requires field '{field}' but it was not found in the current response")] - ExecutionFieldNotFound { - /// The field that is not found. - field: String, - }, - - /// The content is missing. - #[error("Missing content at '{path}'")] - ExecutionMissingContent { - /// Path to the content. - path: Path, - }, -} - -impl FetchError { - /// Convert the fetch error to a GraphQL error. - pub fn to_graphql_error(&self, path: Option) -> GraphQLError { - GraphQLError { - message: self.to_string(), - locations: Default::default(), - path: path.unwrap_or_default(), - extensions: serde_json::to_value(self) - .unwrap() - .as_object() - .unwrap() - .to_owned(), - } - } - - /// Convert the error to an appropriate response. - pub fn to_response(&self, primary: bool) -> GraphQLResponse { - if primary { - self.to_primary() - } else { - self.to_patch() - } - } - - /// Convert the fetch error to a primary graphql response. - pub fn to_primary(&self) -> GraphQLResponse { - GraphQLResponse::Primary(GraphQLPrimaryResponse { - data: Default::default(), - has_next: false, - errors: vec![self.to_graphql_error(None)], - extensions: Default::default(), - }) - } - - /// Convert the fetch error to a patch graphql response. - pub fn to_patch(&self) -> GraphQLResponse { - // Note that most of the values here will be overwritten when merged into the final response - // by the traverser. e.g. label, and path. - GraphQLResponse::Patch(GraphQLPatchResponse { - label: Default::default(), - data: Default::default(), - path: Default::default(), - has_next: false, - errors: vec![self.to_graphql_error(None)], - extensions: Default::default(), - }) - } -} - -/// A GraphQL path element that is composes of strings or numbers. -/// e.g `/book/3/name` -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum PathElement { - /// An index path element. - Index(usize), - - /// A key path element. - Key(String), - - /// A path element that given an array will flatmap the content. - Flatmap, -} - -/// A path into the result document. This can be composed of strings and numbers -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Default)] -#[serde(transparent)] -pub struct Path { - path: Vec, -} - -impl Deref for Path { - type Target = Vec; - - fn deref(&self) -> &Self::Target { - &self.path - } -} - -impl DerefMut for Path { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.path - } -} - -impl Path { - fn new(path: &[PathElement]) -> Path { - Path { path: path.into() } - } - fn parse(path: impl Into) -> Path { - Path { - path: path - .into() - .split('/') - .map(|e| match (e, e.parse::()) { - (_, Ok(index)) => PathElement::Index(index), - (s, _) if s == "@" => PathElement::Flatmap, - (s, _) => PathElement::Key(s.to_string()), - }) - .collect(), - } - } - - fn empty() -> Path { - Path { - path: Default::default(), - } - } - - fn parent(&self) -> Path { - let mut path = self.path.to_owned(); - path.pop(); - Path { path } - } - - fn append(&mut self, path: &Path) { - self.path.append(&mut path.path.to_owned()); - } -} - -impl Display for Path { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str( - self.path - .iter() - .map(|e| match e { - PathElement::Index(index) => index.to_string(), - PathElement::Key(key) => key.into(), - PathElement::Flatmap => "@".into(), - }) - .collect::>() - .join("/") - .as_str(), - ) - } -} - /// A graphql request. /// Used for federated and subgraph queries. #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypedBuilder)] @@ -277,115 +48,7 @@ pub struct GraphQLRequest { /// Graphql extensions. #[serde(skip_serializing_if = "Object::is_empty", default)] #[builder(default)] - pub extensions: Extensions, -} - -/// A graphql primary response. -/// Used for federated and subgraph queries. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct GraphQLPrimaryResponse { - /// The response data. - pub data: Object, - - /// The optional indicator that there may be more data in the form of a patch response. - #[serde(skip_serializing_if = "bool::to_owned", default)] - pub has_next: bool, - - /// The optional graphql errors encountered. - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub errors: Errors, - - /// The optional graphql extensions. - #[serde(skip_serializing_if = "Object::is_empty", default)] - pub extensions: Extensions, -} - -/// A graphql patch response . -/// Used for federated and subgraph queries. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct GraphQLPatchResponse { - /// The label that was passed to the defer or stream directive for this patch. - pub label: String, - - /// The data to merge into the response. - pub data: Object, - - /// The path that the data should be merged at. - pub path: Path, - - /// An indicator if there is potentially more data to fetch. - pub has_next: bool, - - /// The optional errors encountered for this patch. - #[serde(skip_serializing_if = "Vec::is_empty", default)] - pub errors: Errors, - - /// The optional graphql extensions. - #[serde(skip_serializing_if = "Object::is_empty", default)] - pub extensions: Extensions, -} - -/// A GraphQL error. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct GraphQLError { - /// The error message. - pub message: String, - - /// The locations of the error from the originating request. - pub locations: Vec, - - /// The path of the error. - pub path: Path, - - /// The optional graphql extensions. - #[serde(default, skip_serializing_if = "Object::is_empty")] - pub extensions: Extensions, -} - -/// A location in the request that triggered a graphql error. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Location { - /// The line number. - pub line: i32, - - /// The column number. - pub column: i32, -} - -/// A GraphQL response. -/// A response stream will typically be composed of a single primary and zero or more patches. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum GraphQLResponse { - /// The first item in a stream of responses will always be a primary response. - Primary(GraphQLPrimaryResponse), - - /// Subsequent responses will always be a patch response. - Patch(GraphQLPatchResponse), -} - -impl GraphQLResponse { - /// Return as a primary response. Panics if not the right type, so should only be used in testing. - pub fn primary(self) -> GraphQLPrimaryResponse { - if let GraphQLResponse::Primary(primary) = self { - primary - } else { - panic!("Not a primary response") - } - } - - /// Return as a patch response. Panics if not the right type, so should only be used in testing. - pub fn patch(self) -> GraphQLPatchResponse { - if let GraphQLResponse::Patch(patch) = self { - patch - } else { - panic!("Not patch response") - } - } + pub extensions: Object, } /// Maintains a map of services to fetchers. @@ -402,17 +65,27 @@ pub trait ServiceRegistry: Send + Sync + Debug { /// The goal of this trait is to hide the implementation details of retching a stream of graphql responses. /// We can then create multiple implementations that can be plugged into federation. #[cfg_attr(feature = "mocks", automock)] -pub trait GraphQLFetcher: Send + Sync + Debug { +pub trait GraphQLFetcher: Send + Sync { /// Constructs a stream of responses. #[must_use = "streams do nothing unless polled"] fn stream(&self, request: GraphQLRequest) -> GraphQLResponseStream; } +impl From for FetchError { + fn from(err: QueryPlannerError) -> Self { + FetchError::ValidationPlanningError { + reason: err.to_string(), + } + } +} + #[cfg(test)] mod tests { + use super::*; use serde_json::json; + use static_assertions::*; - use super::*; + assert_obj_safe!(ServiceRegistry); #[test] fn test_request() { @@ -442,7 +115,7 @@ mod tests { #[test] fn test_response() { - let result = serde_json::from_str::( + let result = serde_json::from_str::( json!( { "errors": [ @@ -483,9 +156,8 @@ mod tests { ); assert_eq!( result.unwrap(), - GraphQLPrimaryResponse { - has_next: Default::default(), - data: json!({ + GraphQLResponse::builder() + .data(json!({ "hero": { "name": "R2-D2", "heroFriends": [ @@ -503,34 +175,33 @@ mod tests { } ] } - }) - .as_object() - .cloned() - .unwrap(), - errors: vec!(GraphQLError { + })) + .errors(vec![GraphQLError { message: "Name for character with ID 1002 could not be fetched.".into(), locations: vec!(Location { line: 6, column: 7 }), - path: Path::parse("hero/heroFriends/1/name"), + path: Path::from("hero/heroFriends/1/name"), extensions: json!({ "error-extension": 5, }) .as_object() .cloned() .unwrap() - }), - extensions: json!({ - "response-extension": 3, - }) - .as_object() - .cloned() - .unwrap() - } + }]) + .extensions( + json!({ + "response-extension": 3, + }) + .as_object() + .cloned() + .unwrap() + ) + .build() ); } #[test] fn test_patch_response() { - let result = serde_json::from_str::( + let result = serde_json::from_str::( json!( { "label": "part", @@ -574,9 +245,9 @@ mod tests { ); assert_eq!( result.unwrap(), - GraphQLPatchResponse { - label: "part".to_owned(), - data: json!({ + GraphQLResponse::builder() + .label("part".to_owned()) + .data(json!({ "hero": { "name": "R2-D2", "heroFriends": [ @@ -594,30 +265,29 @@ mod tests { } ] } - }) - .as_object() - .cloned() - .unwrap(), - path: Path::parse("hero/heroFriends/1/name"), - has_next: true, - errors: vec!(GraphQLError { + })) + .path(Path::from("hero/heroFriends/1/name")) + .has_next(true) + .errors(vec![GraphQLError { message: "Name for character with ID 1002 could not be fetched.".into(), locations: vec!(Location { line: 6, column: 7 }), - path: Path::parse("hero/heroFriends/1/name"), + path: Path::from("hero/heroFriends/1/name"), extensions: json!({ "error-extension": 5, }) .as_object() .cloned() .unwrap() - }), - extensions: json!({ - "response-extension": 3, - }) - .as_object() - .cloned() - .unwrap() - } + }]) + .extensions( + json!({ + "response-extension": 3, + }) + .as_object() + .cloned() + .unwrap() + ) + .build() ); } } diff --git a/crates/execution/src/response.rs b/crates/execution/src/response.rs new file mode 100644 index 000000000..d9b5ce2f4 --- /dev/null +++ b/crates/execution/src/response.rs @@ -0,0 +1,248 @@ +use crate::{FetchError, GraphQLError}; +use apollo_json_ext::prelude::*; +use query_planner::model::{Field, InlineFragment, Selection}; +use serde::{Deserialize, Serialize}; +use typed_builder::TypedBuilder; + +/// A graphql primary response. +/// Used for federated and subgraph queries. +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypedBuilder)] +#[serde(rename_all = "camelCase")] +#[builder(field_defaults(setter(into)))] +pub struct GraphQLResponse { + /// The label that was passed to the defer or stream directive for this patch. + #[serde(skip_serializing_if = "Option::is_none", default)] + #[builder(default)] + pub label: Option, + + /// The response data. + #[serde(skip_serializing_if = "skip_data_if", default)] + #[builder(default = Value::Object(Default::default()))] + pub data: Value, + + /// The path that the data should be merged at. + #[serde(skip_serializing_if = "Option::is_none", default)] + #[builder(default)] + pub path: Option, + + /// The optional indicator that there may be more data in the form of a patch response. + #[serde(skip_serializing_if = "Option::is_none", default)] + #[builder(default)] + pub has_next: Option, + + /// The optional graphql errors encountered. + #[serde(skip_serializing_if = "Vec::is_empty", default)] + #[builder(default)] + pub errors: Vec, + + /// The optional graphql extensions. + #[serde(skip_serializing_if = "Object::is_empty", default)] + #[builder(default)] + pub extensions: Object, +} + +fn skip_data_if(value: &Value) -> bool { + match value { + Value::Object(o) => o.is_empty(), + Value::Null => true, + _ => false, + } +} + +impl GraphQLResponse { + pub fn is_primary(&self) -> bool { + self.has_next.is_none() + } + + pub fn select(&self, path: &Path, selections: &[Selection]) -> Result { + let values = + self.data + .get_at_path(&path) + .map_err(|err| FetchError::ExecutionPathNotFound { + reason: err.to_string(), + })?; + + Ok(Value::Array( + values + .into_iter() + .flat_map(|value| match (value, selections) { + (Value::Object(content), requires) => { + select_object(&content, &requires).transpose() + } + (_, _) => Some(Err(FetchError::ExecutionInvalidContent { + reason: "not an object".to_string(), + })), + }) + .collect::, _>>()?, + )) + } + + pub fn insert_data(&mut self, path: &Path, value: Value) -> Result<(), FetchError> { + let nodes = + self.data + .get_at_path_mut(&path) + .map_err(|err| FetchError::ExecutionPathNotFound { + reason: err.to_string(), + })?; + + for node in nodes { + node.deep_merge(&value); + } + + Ok(()) + } +} + +fn select_object(content: &Object, selections: &[Selection]) -> Result, FetchError> { + let mut output = Object::new(); + for selection in selections { + match selection { + Selection::Field(field) => { + if let Some(value) = select_field(content, &field)? { + output + .entry(field.name.to_owned()) + .and_modify(|existing| existing.deep_merge(&value)) + .or_insert(value); + } + } + Selection::InlineFragment(fragment) => { + if let Some(Value::Object(value)) = select_inline_fragment(content, fragment)? { + output.append(&mut value.to_owned()) + } + } + }; + } + if output.is_empty() { + return Ok(None); + } + Ok(Some(Value::Object(output))) +} + +fn select_field(content: &Object, field: &Field) -> Result, FetchError> { + match (content.get(&field.name), &field.selections) { + (Some(Value::Object(child)), Some(selections)) => select_object(&child, selections), + (Some(value), None) => Ok(Some(value.to_owned())), + (None, _) => Err(FetchError::ExecutionFieldNotFound { + field: field.name.to_owned(), + }), + _ => Ok(None), + } +} + +fn select_inline_fragment( + content: &Object, + fragment: &InlineFragment, +) -> Result, FetchError> { + match (&fragment.type_condition, &content.get("__typename")) { + (Some(condition), Some(Value::String(typename))) => { + if condition == typename { + select_object(content, &fragment.selections) + } else { + Ok(None) + } + } + (None, _) => select_object(content, &fragment.selections), + (_, None) => Err(FetchError::ExecutionFieldNotFound { + field: "__typename".to_string(), + }), + (_, _) => Ok(None), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + macro_rules! assert_selection { + ($content:expr, $expected:expr $(,)?) => { + let response = GraphQLResponse::builder() + .data($content) + .build(); + let stub = json!([ + { + "kind": "InlineFragment", + "typeCondition": "OtherStuffToIgnore", + "selections": [], + }, + { + "kind": "InlineFragment", + "typeCondition": "User", + "selections": [ + { + "kind": "Field", + "name": "__typename", + }, + { + "kind": "Field", + "name": "id", + }, + { + "kind": "Field", + "name": "job", + "selections": [ + { + "kind": "Field", + "name": "name", + } + ], + } + ] + }, + ]); + let selection: Vec = serde_json::from_value(stub).unwrap(); + assert_eq!(response.select(&Path::empty(), &selection), $expected); + }; + } + + #[test] + fn test_selection() { + assert_selection!( + json!({"__typename": "User", "id":2, "name":"Bob", "job":{"name":"astronaut"}}), + Ok(json!([{ + "__typename": "User", + "id": 2, + "job": { + "name": "astronaut" + } + }])), + ); + } + + #[test] + fn test_selection_missing_field() { + assert_selection!( + json!({"__typename": "User", "name":"Bob", "job":{"name":"astronaut"}}), + Err(FetchError::ExecutionFieldNotFound { + field: "id".to_string(), + }), + ); + } + + #[test] + fn test_insert_data() { + let mut response = GraphQLResponse::builder() + .data(json!({ + "name": "SpongeBob", + "job": {}, + })) + .build(); + response + .insert_data( + &Path::from("job"), + json!({ + "name": "cook", + }), + ) + .unwrap(); + assert_eq!( + response.data, + json!({ + "name": "SpongeBob", + "job": { + "name": "cook", + }, + }), + ); + } +} diff --git a/crates/execution/src/traverser.rs b/crates/execution/src/traverser.rs deleted file mode 100644 index 234c63398..000000000 --- a/crates/execution/src/traverser.rs +++ /dev/null @@ -1,537 +0,0 @@ -use derivative::Derivative; -use futures::prelude::*; -use parking_lot::Mutex; -use serde_json::{Map, Value}; -use std::fmt::Formatter; -use std::pin::Pin; -use std::sync::Arc; - -use query_planner::model::{Field, InlineFragment, Selection, SelectionSet}; - -use crate::json_utils::{deep_merge, JsonUtils}; -use crate::PathElement::Flatmap; -use crate::{ - FetchError, GraphQLError, GraphQLPrimaryResponse, GraphQLRequest, GraphQLResponse, - GraphQLResponseStream, Object, Path, PathElement, -}; - -#[allow(dead_code)] -type TraverserStream = Pin + Send>>; - -/// A traverser is a object that is used to keep track of paths in the traversal and holds references -/// to the output that we want to collect. -/// Traversers may be cloned but will all share the same output via an Arc> -/// Traversers may spawn child traversers with different paths via the stream method. -#[derive(Derivative, Clone)] -#[derivative(Debug)] -pub(crate) struct Traverser { - pub(crate) path: Path, - pub(crate) content: Arc>>, - pub(crate) request: Arc, - - #[allow(dead_code)] - #[derivative(Debug(format_with = "Traverser::format_streams"))] - pub(crate) patches: Arc>>, - #[allow(dead_code)] - pub(crate) errors: Arc>>, -} - -impl Traverser { - #[allow(dead_code)] - pub(crate) fn path(&self) -> &Path { - &self.path - } - - #[allow(dead_code)] - pub(crate) fn content(&self) -> Option { - self.content.lock().to_owned() - } - - fn format_streams( - streams: &Arc>>, - fmt: &mut Formatter<'_>, - ) -> std::fmt::Result { - let streams = streams.lock(); - fmt.write_fmt(format_args!("PatchStream[{}]", streams.len())) - } - - pub(crate) fn new(request: Arc) -> Self { - Self { - path: Path::empty(), - content: Default::default(), - request, - patches: Default::default(), - errors: Default::default(), - } - } - - pub(crate) fn descendant(&self, path: &Path) -> Traverser { - let mut new_path = self.path.clone(); - new_path.append(&path); - Traverser { - path: new_path, - ..self.to_owned() - } - } - - pub(crate) fn add_graphql_error(&mut self, graphql_error: &GraphQLError) { - // Prepend the traverser path. - let mut graphql_error = graphql_error.to_owned(); - graphql_error.path.splice(0..0, self.path.to_owned().path); - self.errors.lock().push(graphql_error); - } - - pub(crate) fn add_error(&mut self, err: &FetchError) { - let graphql_error = err.to_graphql_error(Some(self.path.to_owned())); - self.errors.lock().push(graphql_error); - } - - pub(crate) fn add_errors(&mut self, errors: &[FetchError]) { - for err in errors { - self.add_error(err); - } - } - - pub(crate) fn to_primary(&self) -> GraphQLResponse { - GraphQLResponse::Primary(GraphQLPrimaryResponse { - data: match self.content.lock().to_owned() { - Some(Value::Object(obj)) => obj, - _ => Map::new(), - }, - has_next: Default::default(), - errors: self.errors.lock().to_owned(), - extensions: Default::default(), - }) - } - - pub(crate) fn merge(&mut self, value: Option<&Value>) { - { - let mut content = self.content.lock(); - match (content.get_at_path_mut(&self.path), value) { - (Some(a), Some(b)) => { - deep_merge(a, &b); - } - (None, Some(b)) => *content = Some(b.to_owned()), - (_, None) => (), - }; - } - } - - /// Create a stream of child traversers that match the supplied path in the current content \ - /// relative to the current traverser path. - pub(crate) fn stream_descendants(&self, path: &Path) -> TraverserStream { - // The root of our stream. We start at ourself! - let mut stream = stream::iter(vec![self.to_owned()]).boxed(); - - // Split the path on array. We only need to flatmap at arrays. - let path_split_by_arrays = - path.split_inclusive(|path_element| path_element == &PathElement::Flatmap); - - for path_chunk in path_split_by_arrays { - // Materialise the path chunk so it can be moved into closures. - let path_chunk = path_chunk.to_vec(); - stream = stream - .flat_map(move |traverser| { - // Fetch the child content and convert it to a stream - let descendant = traverser.descendant(&Path::new(&path_chunk)); - let content = &descendant.content.lock(); - let content_at_path = content.get_at_path(&descendant.path); - - match content_at_path { - // This was an array and we wanted a flatmap - Some(Value::Array(array)) if Some(&Flatmap) == path_chunk.last() => { - let parent = descendant.parent(); - stream::iter(0..array.len()) - .map(move |index| { - parent.descendant(&Path::new(&[PathElement::Index(index)])) - }) - .boxed() - } - // No flatmap requested, just return the element. - Some(_child) if Some(&Flatmap) != path_chunk.last() => { - stream::iter(vec![descendant.to_owned()]).boxed() - } - // Either there was nothing or there was a flatmap requested on a non array. - None | Some(_) => stream::empty().boxed(), - } - }) - .boxed(); - } - stream - } - - /// Takes a selection set and extracts a json value from the current content for sending to downstream requests. - pub(crate) fn select( - &self, - selection: &Option, - ) -> Result, FetchError> { - let content = self.content.lock(); - match (content.get_at_path(&self.path), selection) { - (Some(Value::Object(content)), Some(requires)) => select_object(&content, &requires), - (None, Some(_)) => Err(FetchError::ExecutionMissingContent { - path: self.path.clone(), - }), - _ => Ok(None), - } - } - - pub(crate) fn parent(&self) -> Traverser { - Traverser { - path: self.path.parent(), - ..self.to_owned() - } - } - - pub(crate) fn has_errors(&self) -> bool { - !self.errors.lock().is_empty() - } -} - -fn select_object(content: &Object, selections: &[Selection]) -> Result, FetchError> { - let mut output = Object::new(); - for selection in selections { - match selection { - Selection::Field(field) => { - if let Some(value) = select_field(content, &field)? { - output - .entry(field.name.to_owned()) - .and_modify(|existing| deep_merge(existing, &value)) - .or_insert(value); - } - } - Selection::InlineFragment(fragment) => { - if let Some(Value::Object(value)) = select_inline_fragment(content, fragment)? { - output.append(&mut value.to_owned()) - } - } - }; - } - if output.is_empty() { - return Ok(None); - } - Ok(Some(Value::Object(output))) -} - -fn select_field(content: &Object, field: &Field) -> Result, FetchError> { - match (content.get(&field.name), &field.selections) { - (Some(Value::Object(child)), Some(selections)) => select_object(&child, selections), - (Some(value), None) => Ok(Some(value.to_owned())), - (None, _) => Err(FetchError::ExecutionFieldNotFound { - field: field.name.to_owned(), - }), - _ => Ok(None), - } -} - -fn select_inline_fragment( - content: &Object, - fragment: &InlineFragment, -) -> Result, FetchError> { - match (&fragment.type_condition, &content.get("__typename")) { - (Some(condition), Some(Value::String(typename))) => { - if condition == typename { - select_object(content, &fragment.selections) - } else { - Ok(None) - } - } - (None, _) => select_object(content, &fragment.selections), - (_, None) => Err(FetchError::ExecutionFieldNotFound { - field: "__typename".to_string(), - }), - (_, _) => Ok(None), - } -} - -#[cfg(test)] -mod tests { - use parking_lot::Mutex; - use std::sync::Arc; - - use futures::prelude::*; - use serde_json::json; - use serde_json::Value; - - use query_planner::model::SelectionSet; - - use crate::traverser::Traverser; - use crate::{FetchError, GraphQLError, GraphQLRequest, Path}; - - impl PartialEq for Traverser { - fn eq(&self, other: &Self) -> bool { - self.path.eq(&other.path) - && self.request.eq(&other.request) - && self.content.lock().eq(&other.content.lock()) - && self.errors.lock().eq(&*other.errors.lock()) - } - } - - fn stub_request() -> GraphQLRequest { - GraphQLRequest::builder().query("").build() - } - - fn stub_traverser() -> Traverser { - Traverser { - path: Path::empty(), - content: Arc::new(Mutex::new(Some( - json!({"obj":{"arr":[{"prop1":1},{"prop1":2}]}}), - ))), - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - } - } - - fn fetch_error() -> GraphQLError { - GraphQLError { - path: Path::empty(), - extensions: Default::default(), - locations: Default::default(), - message: "Nooo".into(), - } - } - - #[tokio::test] - async fn test_merge() { - let mut traverser = Traverser { - path: Path::parse("obj"), - content: Arc::new(Mutex::new(Some( - json!({"obj":{"arr":[{"prop1":1},{"prop2":2}]}}), - ))), - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - }; - traverser.merge(Some(&json!({"arr":[{"prop3":3}]}))); - assert_eq!( - traverser, - Traverser { - path: Path::parse("obj"), - content: Arc::new(Mutex::new(Some( - json!({"obj":{"arr":[{"prop1":1, "prop3":3},{"prop2":2}]}}) - ))), - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - } - ) - } - - #[tokio::test] - async fn test_merge_left() { - let mut traverser = Traverser { - path: Path::empty(), - content: Arc::new(Mutex::new(None)), - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - }; - traverser.merge(Some(&json!({"arr":[{"prop3":3}]}))); - - assert_eq!( - traverser, - Traverser { - path: Path::empty(), - content: Arc::new(Mutex::new(Some(json!({"arr":[{"prop3":3}]})))), - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - } - ) - } - - #[tokio::test] - async fn test_stream_no_array() { - assert_eq!( - stub_traverser() - .stream_descendants(&Path::parse("obj")) - .collect::>() - .await, - vec![Traverser { - path: Path::parse("obj"), - content: stub_traverser().content, - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - }] - ) - } - - #[tokio::test] - async fn test_stream_from_obj() { - assert_eq!( - stub_traverser() - .stream_descendants(&Path::parse("obj")) - .next() - .await - .unwrap() - .stream_descendants(&Path::parse("arr")) - .collect::>() - .await, - vec![Traverser { - path: Path::parse("obj/arr"), - content: stub_traverser().content, - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - }] - ) - } - - #[tokio::test] - async fn test_stream_with_array() { - assert_eq!( - stub_traverser() - .stream_descendants(&Path::parse("obj/arr")) - .collect::>() - .await, - vec![Traverser { - path: Path::parse("obj/arr"), - content: stub_traverser().content, - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - }] - ) - } - - #[tokio::test] - async fn test_stream_flatmap() { - assert_eq!( - stub_traverser() - .stream_descendants(&Path::parse("obj/arr/@/prop1")) - .collect::>() - .await, - vec![ - Traverser { - path: Path::parse("obj/arr/0/prop1"), - content: stub_traverser().content, - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - }, - Traverser { - path: Path::parse("obj/arr/1/prop1"), - content: stub_traverser().content, - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![fetch_error()])), - } - ] - ) - } - - fn stub_selection() -> Value { - json!([ - { - "kind": "InlineFragment", - "typeCondition": "OtherStuffToIgnore", - "selections": [], - }, - { - "kind": "InlineFragment", - "typeCondition": "User", - "selections": [ - { - "kind": "Field", - "name": "__typename" - }, - { - "kind": "Field", - "name": "id" - }, - { - "kind": "Field", - "name": "job", - "selections": [ - { - "kind": "Field", - "name": "name" - }] - } - ] - }, - ]) - } - - #[test] - fn test_selection() { - let result = selection( - stub_selection(), - Some(json!({"__typename": "User", "id":2, "name":"Bob", "job":{"name":"astronaut"}})), - Path::empty(), - ); - assert_eq!( - result, - Ok(Some(json!({ - "__typename": "User", - "id": 2, - "job": { - "name": "astronaut" - } - }))) - ); - } - - #[test] - fn test_selection_missing_field() { - let result = selection( - stub_selection(), - Some(json!({"__typename": "User", "name":"Bob", "job":{"name":"astronaut"}})), - Path::empty(), - ); - assert_eq!( - result, - Err(FetchError::ExecutionFieldNotFound { - field: "id".to_string() - }), - ); - } - - #[test] - fn test_selection_no_content() { - let result = selection(stub_selection(), None, Path::empty()); - assert_eq!( - result, - Err(FetchError::ExecutionMissingContent { - path: Path::empty() - }) - ); - } - - #[test] - fn test_selection_at_path() { - let result = selection( - json!([{ - "kind": "Field", - "name": "name" - }]), - Some(json!({"__typename": "User", "id":2, "name":"Bob", "job":{"name":"astronaut"}})), - Path::parse("job"), - ); - assert_eq!( - result, - Ok(Some(json!({ - "name": "astronaut" - }))) - ); - } - - fn selection( - selection_set: Value, - content: Option, - path: Path, - ) -> Result, FetchError> { - let selection_set = serde_json::from_value::(selection_set).unwrap(); - - let traverser = Traverser { - path, - content: Arc::new(Mutex::new(content)), - request: Arc::new(stub_request()), - patches: Arc::new(Mutex::new(vec![])), - errors: Arc::new(Mutex::new(vec![])), - }; - - traverser.select(&Some(selection_set)) - } -} diff --git a/crates/query-planner/Cargo.toml b/crates/query-planner/Cargo.toml index feac27228..811b6d588 100644 --- a/crates/query-planner/Cargo.toml +++ b/crates/query-planner/Cargo.toml @@ -8,13 +8,15 @@ license-file = "./LICENSE" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +apollo-json-ext = { path = "../apollo-json-ext" } +derive_more = "0.99.16" harmonizer = { git = "https://github.com/apollographql/federation.git", rev = "34507803"} -serde_json = "1.0.66" +parking_lot = "0.11" serde = { version = "1.0.129", features = ["derive"] } -derive_more = "0.99.16" +serde_json = "1.0.66" thiserror = "1.0.26" - [dev-dependencies] insta = "1.7.2" mockall = "0.10.2" +static_assertions = "1" diff --git a/crates/query-planner/src/caching.rs b/crates/query-planner/src/caching.rs index c158d93b5..84a64853b 100644 --- a/crates/query-planner/src/caching.rs +++ b/crates/query-planner/src/caching.rs @@ -1,17 +1,19 @@ -use std::collections::HashMap; - -/// Caching query planner that caches responses from a delegate. use crate::model::QueryPlan; use crate::{QueryPlanOptions, QueryPlanner, QueryPlannerError}; +use parking_lot::Mutex; +use std::collections::HashMap; -/// A query planner decorator that caches results. +/// A query planner wrapper that caches results. +/// /// There is no eviction strategy, query plans will be retained forever. #[derive(Debug)] pub struct CachingQueryPlanner { delegate: T, - cached: HashMap< - (String, Option, crate::QueryPlanOptions), - Result, + cached: Mutex< + HashMap< + (String, Option, crate::QueryPlanOptions), + Result, + >, >, } @@ -19,22 +21,22 @@ impl CachingQueryPlanner { fn new(delegate: T) -> CachingQueryPlanner { Self { delegate, - cached: HashMap::new(), + cached: Default::default(), } } } impl crate::QueryPlanner for CachingQueryPlanner { fn get( - &mut self, + &self, query: String, operation: Option, options: QueryPlanOptions, ) -> Result { - let delegate = &mut self.delegate; self.cached + .lock() .entry((query.clone(), operation.clone(), options.clone())) - .or_insert_with(|| delegate.get(query, operation, options)) + .or_insert_with(|| self.delegate.get(query, operation, options)) .clone() } } @@ -68,7 +70,7 @@ mod tests { parse_errors: "".into(), })); - let mut planner = delegate.with_caching(); + let planner = delegate.with_caching(); for _ in 0..5 { assert!(planner diff --git a/crates/query-planner/src/harmonizer.rs b/crates/query-planner/src/harmonizer.rs index 95bd7d9b5..6befa4392 100644 --- a/crates/query-planner/src/harmonizer.rs +++ b/crates/query-planner/src/harmonizer.rs @@ -22,7 +22,7 @@ impl HarmonizerQueryPlanner { impl crate::QueryPlanner for HarmonizerQueryPlanner { fn get( - &mut self, + &self, query: String, operation: Option, options: QueryPlanOptions, @@ -71,8 +71,7 @@ mod tests { #[test] fn test_plan() { - let mut planner = - HarmonizerQueryPlanner::new(include_str!("testdata/schema.graphql").into()); + let planner = HarmonizerQueryPlanner::new(include_str!("testdata/schema.graphql").into()); let result = planner.get( include_str!("testdata/query.graphql").into(), None, @@ -93,7 +92,7 @@ mod tests { #[test] fn test_plan_error() { - let mut planner = HarmonizerQueryPlanner::new("".to_string()); + let planner = HarmonizerQueryPlanner::new("".to_string()); let result = planner.get("".into(), None, QueryPlanOptions::default()); assert_eq!( "Query planning had errors: Planning errors: UNKNOWN: Syntax Error: Unexpected .", diff --git a/crates/query-planner/src/lib.rs b/crates/query-planner/src/lib.rs index 2c18dbcde..389c24470 100644 --- a/crates/query-planner/src/lib.rs +++ b/crates/query-planner/src/lib.rs @@ -37,9 +37,9 @@ pub enum QueryPlannerError { pub struct QueryPlanOptions {} /// Query planning options. -impl QueryPlanOptions { +impl Default for QueryPlanOptions { /// Default query planning options. - pub fn default() -> QueryPlanOptions { + fn default() -> QueryPlanOptions { QueryPlanOptions {} } } @@ -47,14 +47,22 @@ impl QueryPlanOptions { /// QueryPlanner can be used to plan queries. /// Implementations may cache query plans. #[cfg_attr(test, automock)] -pub trait QueryPlanner: Send { +pub trait QueryPlanner: Send + Sync { /// Returns a query plan given the query, operation and options. /// Implementations may cache query plans. #[must_use = "query plan result must be used"] fn get( - &mut self, + &self, query: String, operation: Option, options: QueryPlanOptions, ) -> Result; } + +#[cfg(test)] +mod tests { + use super::*; + use static_assertions::*; + + assert_obj_safe!(QueryPlanner); +} diff --git a/crates/query-planner/src/model.rs b/crates/query-planner/src/model.rs index c5ba7d88a..a00e44f9b 100644 --- a/crates/query-planner/src/model.rs +++ b/crates/query-planner/src/model.rs @@ -3,6 +3,7 @@ //! //! QueryPlans are a set of operations that describe how a federated query is processed. +use apollo_json_ext::prelude::*; use serde::{Deserialize, Serialize}; /// The root query plan container @@ -73,13 +74,13 @@ pub struct FetchNode { /// The data that is required for the subgraph fetch. #[serde(skip_serializing_if = "Option::is_none")] - pub requires: Option, + pub requires: Option>, /// The variables that are used for the subgraph fetch. pub variable_usages: Vec, /// The GraphQL subquery that is used for the fetch. - pub operation: GraphQLQuery, + pub operation: String, } /// A flatten node. @@ -87,7 +88,7 @@ pub struct FetchNode { #[serde(rename_all = "camelCase")] pub struct FlattenNode { /// The path when result should be merged. - pub path: ResponsePath, + pub path: Path, /// The child execution plan. pub node: Box, @@ -118,7 +119,7 @@ pub struct Field { /// The selections for the field. #[serde(skip_serializing_if = "Option::is_none")] - pub selections: Option, + pub selections: Option>, } /// An inline fragment. @@ -130,18 +131,9 @@ pub struct InlineFragment { pub type_condition: Option, /// The selections from the fragment. - pub selections: SelectionSet, + pub selections: Vec, } -/// A selection set is a list of data required for a fetch. -pub type SelectionSet = Vec; - -/// A string representing a graphql query. -pub type GraphQLQuery = String; - -///A path where a the response is merged in to the result. -pub type ResponsePath = Vec; - #[cfg(test)] mod tests { use serde_json::Value; @@ -169,8 +161,7 @@ mod tests { PlanNode::Sequence { nodes: vec![ PlanNode::Flatten(FlattenNode { - path: vec![ - "topProducts".to_owned(), "@".to_owned()], + path: Path::from("topProducts/@"), node: Box::new(PlanNode::Fetch(FetchNode { service_name: "books".to_owned(), variable_usages: vec!["test_variable".into()], @@ -193,9 +184,7 @@ mod tests { })), }), PlanNode::Flatten(FlattenNode { - path: vec![ - "topProducts".to_owned(), - "@".to_owned()], + path: Path::from("topProducts/@"), node: Box::new(PlanNode::Fetch(FetchNode { service_name: "product".to_owned(), variable_usages: vec![], @@ -231,7 +220,7 @@ mod tests { PlanNode::Sequence { nodes: vec![ PlanNode::Flatten(FlattenNode { - path: vec!["product".to_owned()], + path: Path::from("product"), node: Box::new(PlanNode::Fetch(FetchNode { service_name: "books".to_owned(), variable_usages: vec![], @@ -254,7 +243,7 @@ mod tests { })), }), PlanNode::Flatten(FlattenNode { - path: vec!["product".to_owned()], + path: Path::from("product"), node: Box::new(PlanNode::Fetch(FetchNode { service_name: "product".to_owned(), variable_usages: vec![], diff --git a/crates/server/src/graph_factory.rs b/crates/server/src/graph_factory.rs index 364bc483b..9997f680a 100644 --- a/crates/server/src/graph_factory.rs +++ b/crates/server/src/graph_factory.rs @@ -27,7 +27,7 @@ impl GraphFactory for FederatedGraphFactory { fn create(&self, configuration: &Configuration, schema: &str) -> FederatedGraph { let service_registry = HttpServiceRegistry::new(configuration); FederatedGraph::new( - HarmonizerQueryPlanner::new(schema.to_owned()).with_caching(), + Box::new(HarmonizerQueryPlanner::new(schema.to_owned()).with_caching()), Arc::new(service_registry), ) } diff --git a/crates/server/src/hyper_http_server_factory.rs b/crates/server/src/hyper_http_server_factory.rs index d31008427..7cc9395b2 100644 --- a/crates/server/src/hyper_http_server_factory.rs +++ b/crates/server/src/hyper_http_server_factory.rs @@ -250,8 +250,7 @@ mod tests { use serde_json::json; use execution::{ - FetchError, GraphQLFetcher, GraphQLPrimaryResponse, GraphQLRequest, GraphQLResponse, - GraphQLResponseStream, + FetchError, GraphQLFetcher, GraphQLRequest, GraphQLResponse, GraphQLResponseStream, }; use super::*; @@ -354,18 +353,9 @@ mod tests { #[tokio::test] async fn response() -> Result<(), FederatedServerError> { - let expected_response = GraphQLPrimaryResponse { - data: json!( - { - "response": "yay", - }) - .as_object() - .cloned() - .unwrap(), - has_next: Default::default(), - errors: Default::default(), - extensions: Default::default(), - }; + let expected_response = GraphQLResponse::builder() + .data(json!({"response": "yay"})) + .build(); let example_response = expected_response.clone(); let (fetcher, server, client) = init("127.0.0.1:0"); { @@ -373,9 +363,7 @@ mod tests { .write() .expect_stream() .times(1) - .return_once(move |_| { - futures::stream::iter(vec![GraphQLResponse::Primary(example_response)]).boxed() - }); + .return_once(move |_| futures::stream::iter(vec![example_response]).boxed()); } let response = client .post(format!("http://{}/graphql", server.listen_address)) @@ -393,7 +381,7 @@ mod tests { .expect("unexpected response"); assert_eq!( - response.json::().await.unwrap(), + response.json::().await.unwrap(), expected_response, ); @@ -406,11 +394,10 @@ mod tests { { fetcher.write().expect_stream().times(1).return_once(|_| { futures::stream::iter(vec![FetchError::SubrequestHttpError { - status: Some(401), service: "Mock service".to_string(), reason: "Mock error".to_string(), } - .to_primary()]) + .to_response(true)]) .boxed() }); } @@ -427,19 +414,17 @@ mod tests { .await .ok() .unwrap() - .json::() + .json::() .await .unwrap(); assert_eq!( response, FetchError::SubrequestHttpError { - status: Some(401), service: "Mock service".to_string(), reason: "Mock error".to_string(), } - .to_primary() - .primary() + .to_response(true) ); server.shutdown().await } diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 8be5ebf1b..5ae47fa39 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -533,7 +533,7 @@ mod tests { let request = GraphQLRequest::builder().query(request).build(); let mut expected = query(socket, request.clone()); - let expected = expected.next().await.unwrap().primary(); + let expected = expected.next().await.unwrap(); let response = to_string_pretty(&expected).unwrap(); assert!(!response.is_empty()); }