diff --git a/service/src/graph_node.rs b/service/src/graph_node.rs index 621ff9d8..23a1c9db 100644 --- a/service/src/graph_node.rs +++ b/service/src/graph_node.rs @@ -1,16 +1,23 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::sync::Arc; + +use anyhow::anyhow; use reqwest::{header, Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::query_processor::UnattestedQueryResult; +use crate::query_processor::{QueryError, UnattestedQueryResult}; +/// Graph node query wrapper. +/// +/// This is Arc internally, so it can be cloned and shared between threads. #[derive(Debug, Clone)] pub struct GraphNodeInstance { - client: Client, - base_url: String, + client: Client, // it is Arc + subgraphs_base_url: Arc, + network_subgraph_url: Arc, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -21,26 +28,38 @@ struct GraphQLQuery { } impl GraphNodeInstance { - pub fn new(base_url: &str) -> GraphNodeInstance { + pub fn new(endpoint: &str, network_subgraph_id: &str) -> GraphNodeInstance { + let subgraphs_base_url = Url::parse(endpoint) + .and_then(|u| u.join("/subgraphs/id")) + .expect("Could not parse graph node endpoint"); + let network_subgraph_url = subgraphs_base_url + .join(network_subgraph_id) + .expect("Could not parse graph node endpoint"); let client = reqwest::Client::builder() .user_agent("indexer-service") .build() .expect("Could not build a client to graph node query endpoint"); GraphNodeInstance { client, - base_url: base_url.to_string(), + subgraphs_base_url: Arc::new(subgraphs_base_url), + network_subgraph_url: Arc::new(network_subgraph_url), } } pub async fn subgraph_query_raw( &self, - endpoint: &str, - body: String, - ) -> Result { + subgraph_id: &str, + data: String, + ) -> Result { let request = self .client - .post(format!("{}/subgraphs/id/{}", self.base_url, endpoint)) - .body(body) + .post(self.subgraphs_base_url.join(subgraph_id).map_err(|e| { + QueryError::Other(anyhow!( + "Could not build subgraph query URL: {}", + e.to_string() + )) + })?) + .body(data) .header(header::CONTENT_TYPE, "application/json"); let response = request.send().await?; @@ -57,12 +76,11 @@ impl GraphNodeInstance { pub async fn network_query_raw( &self, - endpoint: Url, body: String, ) -> Result { let request = self .client - .post(endpoint) + .post(Url::clone(&self.network_subgraph_url)) .body(body.clone()) .header(header::CONTENT_TYPE, "application/json"); @@ -78,14 +96,12 @@ impl GraphNodeInstance { pub async fn network_query( &self, - endpoint: Url, query: String, variables: Option, ) -> Result { let body = GraphQLQuery { query, variables }; self.network_query_raw( - endpoint, serde_json::to_string(&body).expect("serialize network GraphQL query"), ) .await diff --git a/service/src/main.rs b/service/src/main.rs index 3496e9e9..51c91ca4 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -55,13 +55,16 @@ async fn main() -> Result<(), std::io::Error> { let config = Cli::args(); let release = package_version().expect("Failed to resolve for release version"); - // Proper initiation of server, query processor - // server health check, graph-node instance connection check - let query_processor = QueryProcessor::new( + // Initialize graph-node client + let graph_node = graph_node::GraphNodeInstance::new( &config.indexer_infrastructure.graph_node_query_endpoint, &config.network_subgraph.network_subgraph_endpoint, ); + // Proper initiation of server, query processor + // server health check, graph-node instance connection check + let query_processor = QueryProcessor::new(graph_node.clone()); + // Start indexer service basic metrics tokio::spawn(handle_serve_metrics( String::from("0.0.0.0"), diff --git a/service/src/query_processor.rs b/service/src/query_processor.rs index 211273af..9b70a694 100644 --- a/service/src/query_processor.rs +++ b/service/src/query_processor.rs @@ -7,7 +7,6 @@ use ethers_core::types::Address; use ethers_core::types::{Signature, U256}; use log::error; use native::attestation::AttestationSigner; -use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use tap_core::tap_manager::SignedReceipt; @@ -148,23 +147,14 @@ pub enum QueryError { #[derive(Debug, Clone)] pub struct QueryProcessor { - client: Client, - base: Url, graph_node: GraphNodeInstance, - network_subgraph: Url, signers: HashMap, } impl QueryProcessor { - pub fn new(graph_node_endpoint: &str, network_subgraph_endpoint: &str) -> QueryProcessor { - let graph_node = GraphNodeInstance::new(graph_node_endpoint); - + pub fn new(graph_node: GraphNodeInstance) -> QueryProcessor { QueryProcessor { - client: Client::new(), - base: Url::parse(graph_node_endpoint).expect("Could not parse graph node endpoint"), graph_node, - network_subgraph: Url::parse(network_subgraph_endpoint) - .expect("Could not parse graph node endpoint"), // TODO: populate signers signers: HashMap::new(), } @@ -189,10 +179,7 @@ impl QueryProcessor { &self, query: String, ) -> Result, QueryError> { - let response = self - .graph_node - .network_query_raw(self.network_subgraph.clone(), query) - .await?; + let response = self.graph_node.network_query_raw(query).await?; Ok(Response { result: response,