From ae7042110a401ff886b125cf7ac426d72925d7cb Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Fri, 11 Aug 2023 18:31:55 -0700 Subject: [PATCH 1/3] refactor: move graph-node instance to main Signed-off-by: Alexis Asseman --- service/src/graph_node.rs | 12 +++++++++--- service/src/main.rs | 7 ++++++- service/src/query_processor.rs | 10 ++-------- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/service/src/graph_node.rs b/service/src/graph_node.rs index 621ff9d8..61535dfa 100644 --- a/service/src/graph_node.rs +++ b/service/src/graph_node.rs @@ -1,16 +1,21 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::sync::Arc; + use reqwest::{header, Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::query_processor::UnattestedQueryResult; +/// Graph node query wrapper. +/// +/// This is Arc internally, so it can be cloned and shared between threads. #[derive(Debug, Clone)] pub struct GraphNodeInstance { - client: Client, - base_url: String, + client: Client, // it is Arc + base_url: Arc, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -22,13 +27,14 @@ struct GraphQLQuery { impl GraphNodeInstance { pub fn new(base_url: &str) -> GraphNodeInstance { + let base_url = Url::parse(base_url).expect("Could not parse graph node endpoint"); let client = reqwest::Client::builder() .user_agent("indexer-service") .build() .expect("Could not build a client to graph node query endpoint"); GraphNodeInstance { client, - base_url: base_url.to_string(), + base_url: Arc::new(base_url.to_string()), } } diff --git a/service/src/main.rs b/service/src/main.rs index 3496e9e9..ffec1224 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -55,10 +55,15 @@ async fn main() -> Result<(), std::io::Error> { let config = Cli::args(); let release = package_version().expect("Failed to resolve for release version"); + // Initialize graph-node client + let graph_node = graph_node::GraphNodeInstance::new( + &config.indexer_infrastructure.graph_node_query_endpoint, + ); + // Proper initiation of server, query processor // server health check, graph-node instance connection check let query_processor = QueryProcessor::new( - &config.indexer_infrastructure.graph_node_query_endpoint, + graph_node.clone(), &config.network_subgraph.network_subgraph_endpoint, ); diff --git a/service/src/query_processor.rs b/service/src/query_processor.rs index 211273af..53ab3724 100644 --- a/service/src/query_processor.rs +++ b/service/src/query_processor.rs @@ -7,7 +7,7 @@ use ethers_core::types::Address; use ethers_core::types::{Signature, U256}; use log::error; use native::attestation::AttestationSigner; -use reqwest::{Client, Url}; +use reqwest::Url; use serde::{Deserialize, Serialize}; use tap_core::tap_manager::SignedReceipt; @@ -148,20 +148,14 @@ pub enum QueryError { #[derive(Debug, Clone)] pub struct QueryProcessor { - client: Client, - base: Url, graph_node: GraphNodeInstance, network_subgraph: Url, signers: HashMap, } impl QueryProcessor { - pub fn new(graph_node_endpoint: &str, network_subgraph_endpoint: &str) -> QueryProcessor { - let graph_node = GraphNodeInstance::new(graph_node_endpoint); - + pub fn new(graph_node: GraphNodeInstance, network_subgraph_endpoint: &str) -> QueryProcessor { QueryProcessor { - client: Client::new(), - base: Url::parse(graph_node_endpoint).expect("Could not parse graph node endpoint"), graph_node, network_subgraph: Url::parse(network_subgraph_endpoint) .expect("Could not parse graph node endpoint"), From ca7f99528a99db4a098754b3d7245a1399824e7e Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Fri, 11 Aug 2023 19:24:45 -0700 Subject: [PATCH 2/3] refactor: move network url into GraphNodeInstance Signed-off-by: Alexis Asseman --- service/src/graph_node.rs | 10 ++++++---- service/src/main.rs | 6 ++---- service/src/query_processor.rs | 11 ++--------- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/service/src/graph_node.rs b/service/src/graph_node.rs index 61535dfa..0ac9c5e8 100644 --- a/service/src/graph_node.rs +++ b/service/src/graph_node.rs @@ -16,6 +16,7 @@ use crate::query_processor::UnattestedQueryResult; pub struct GraphNodeInstance { client: Client, // it is Arc base_url: Arc, + network_subgraph: Arc, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -26,8 +27,10 @@ struct GraphQLQuery { } impl GraphNodeInstance { - pub fn new(base_url: &str) -> GraphNodeInstance { + pub fn new(base_url: &str, network_subgraph: &str) -> GraphNodeInstance { let base_url = Url::parse(base_url).expect("Could not parse graph node endpoint"); + let network_subgraph = + Url::parse(network_subgraph).expect("Could not parse graph node endpoint"); let client = reqwest::Client::builder() .user_agent("indexer-service") .build() @@ -35,6 +38,7 @@ impl GraphNodeInstance { GraphNodeInstance { client, base_url: Arc::new(base_url.to_string()), + network_subgraph: Arc::new(network_subgraph), } } @@ -63,12 +67,11 @@ impl GraphNodeInstance { pub async fn network_query_raw( &self, - endpoint: Url, body: String, ) -> Result { let request = self .client - .post(endpoint) + .post(Url::clone(&self.network_subgraph)) .body(body.clone()) .header(header::CONTENT_TYPE, "application/json"); @@ -91,7 +94,6 @@ impl GraphNodeInstance { let body = GraphQLQuery { query, variables }; self.network_query_raw( - endpoint, serde_json::to_string(&body).expect("serialize network GraphQL query"), ) .await diff --git a/service/src/main.rs b/service/src/main.rs index ffec1224..51c91ca4 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -58,14 +58,12 @@ async fn main() -> Result<(), std::io::Error> { // Initialize graph-node client let graph_node = graph_node::GraphNodeInstance::new( &config.indexer_infrastructure.graph_node_query_endpoint, + &config.network_subgraph.network_subgraph_endpoint, ); // Proper initiation of server, query processor // server health check, graph-node instance connection check - let query_processor = QueryProcessor::new( - graph_node.clone(), - &config.network_subgraph.network_subgraph_endpoint, - ); + let query_processor = QueryProcessor::new(graph_node.clone()); // Start indexer service basic metrics tokio::spawn(handle_serve_metrics( diff --git a/service/src/query_processor.rs b/service/src/query_processor.rs index 53ab3724..9b70a694 100644 --- a/service/src/query_processor.rs +++ b/service/src/query_processor.rs @@ -7,7 +7,6 @@ use ethers_core::types::Address; use ethers_core::types::{Signature, U256}; use log::error; use native::attestation::AttestationSigner; -use reqwest::Url; use serde::{Deserialize, Serialize}; use tap_core::tap_manager::SignedReceipt; @@ -149,16 +148,13 @@ pub enum QueryError { #[derive(Debug, Clone)] pub struct QueryProcessor { graph_node: GraphNodeInstance, - network_subgraph: Url, signers: HashMap, } impl QueryProcessor { - pub fn new(graph_node: GraphNodeInstance, network_subgraph_endpoint: &str) -> QueryProcessor { + pub fn new(graph_node: GraphNodeInstance) -> QueryProcessor { QueryProcessor { graph_node, - network_subgraph: Url::parse(network_subgraph_endpoint) - .expect("Could not parse graph node endpoint"), // TODO: populate signers signers: HashMap::new(), } @@ -183,10 +179,7 @@ impl QueryProcessor { &self, query: String, ) -> Result, QueryError> { - let response = self - .graph_node - .network_query_raw(self.network_subgraph.clone(), query) - .await?; + let response = self.graph_node.network_query_raw(query).await?; Ok(Response { result: response, From 12c47a251bc9dde56d60488b0fcd65667e619cb1 Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Mon, 14 Aug 2023 18:14:17 -0700 Subject: [PATCH 3/3] fix: Robust URL joins in GraphNodeInstance Signed-off-by: Alexis Asseman --- service/src/graph_node.rs | 40 +++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/service/src/graph_node.rs b/service/src/graph_node.rs index 0ac9c5e8..23a1c9db 100644 --- a/service/src/graph_node.rs +++ b/service/src/graph_node.rs @@ -3,11 +3,12 @@ use std::sync::Arc; +use anyhow::anyhow; use reqwest::{header, Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::query_processor::UnattestedQueryResult; +use crate::query_processor::{QueryError, UnattestedQueryResult}; /// Graph node query wrapper. /// @@ -15,8 +16,8 @@ use crate::query_processor::UnattestedQueryResult; #[derive(Debug, Clone)] pub struct GraphNodeInstance { client: Client, // it is Arc - base_url: Arc, - network_subgraph: Arc, + subgraphs_base_url: Arc, + network_subgraph_url: Arc, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -27,30 +28,38 @@ struct GraphQLQuery { } impl GraphNodeInstance { - pub fn new(base_url: &str, network_subgraph: &str) -> GraphNodeInstance { - let base_url = Url::parse(base_url).expect("Could not parse graph node endpoint"); - let network_subgraph = - Url::parse(network_subgraph).expect("Could not parse graph node endpoint"); + pub fn new(endpoint: &str, network_subgraph_id: &str) -> GraphNodeInstance { + let subgraphs_base_url = Url::parse(endpoint) + .and_then(|u| u.join("/subgraphs/id")) + .expect("Could not parse graph node endpoint"); + let network_subgraph_url = subgraphs_base_url + .join(network_subgraph_id) + .expect("Could not parse graph node endpoint"); let client = reqwest::Client::builder() .user_agent("indexer-service") .build() .expect("Could not build a client to graph node query endpoint"); GraphNodeInstance { client, - base_url: Arc::new(base_url.to_string()), - network_subgraph: Arc::new(network_subgraph), + subgraphs_base_url: Arc::new(subgraphs_base_url), + network_subgraph_url: Arc::new(network_subgraph_url), } } pub async fn subgraph_query_raw( &self, - endpoint: &str, - body: String, - ) -> Result { + subgraph_id: &str, + data: String, + ) -> Result { let request = self .client - .post(format!("{}/subgraphs/id/{}", self.base_url, endpoint)) - .body(body) + .post(self.subgraphs_base_url.join(subgraph_id).map_err(|e| { + QueryError::Other(anyhow!( + "Could not build subgraph query URL: {}", + e.to_string() + )) + })?) + .body(data) .header(header::CONTENT_TYPE, "application/json"); let response = request.send().await?; @@ -71,7 +80,7 @@ impl GraphNodeInstance { ) -> Result { let request = self .client - .post(Url::clone(&self.network_subgraph)) + .post(Url::clone(&self.network_subgraph_url)) .body(body.clone()) .header(header::CONTENT_TYPE, "application/json"); @@ -87,7 +96,6 @@ impl GraphNodeInstance { pub async fn network_query( &self, - endpoint: Url, query: String, variables: Option, ) -> Result {