Skip to content

Commit

Permalink
Merge pull request #28 from graphops/aasseman/query_processor_refactor
Browse files Browse the repository at this point in the history
refactor: move graph-node instance to main
  • Loading branch information
aasseman authored Aug 15, 2023
2 parents ce19e8c + 12c47a2 commit 2b87043
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 32 deletions.
44 changes: 30 additions & 14 deletions service/src/graph_node.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::anyhow;
use reqwest::{header, Client, Url};
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::query_processor::UnattestedQueryResult;
use crate::query_processor::{QueryError, UnattestedQueryResult};

/// Graph node query wrapper.
///
/// This is Arc internally, so it can be cloned and shared between threads.
#[derive(Debug, Clone)]
pub struct GraphNodeInstance {
client: Client,
base_url: String,
client: Client, // it is Arc
subgraphs_base_url: Arc<Url>,
network_subgraph_url: Arc<Url>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -21,26 +28,38 @@ struct GraphQLQuery {
}

impl GraphNodeInstance {
pub fn new(base_url: &str) -> GraphNodeInstance {
pub fn new(endpoint: &str, network_subgraph_id: &str) -> GraphNodeInstance {
let subgraphs_base_url = Url::parse(endpoint)
.and_then(|u| u.join("/subgraphs/id"))
.expect("Could not parse graph node endpoint");
let network_subgraph_url = subgraphs_base_url
.join(network_subgraph_id)
.expect("Could not parse graph node endpoint");
let client = reqwest::Client::builder()
.user_agent("indexer-service")
.build()
.expect("Could not build a client to graph node query endpoint");
GraphNodeInstance {
client,
base_url: base_url.to_string(),
subgraphs_base_url: Arc::new(subgraphs_base_url),
network_subgraph_url: Arc::new(network_subgraph_url),
}
}

pub async fn subgraph_query_raw(
&self,
endpoint: &str,
body: String,
) -> Result<UnattestedQueryResult, reqwest::Error> {
subgraph_id: &str,
data: String,
) -> Result<UnattestedQueryResult, QueryError> {
let request = self
.client
.post(format!("{}/subgraphs/id/{}", self.base_url, endpoint))
.body(body)
.post(self.subgraphs_base_url.join(subgraph_id).map_err(|e| {
QueryError::Other(anyhow!(
"Could not build subgraph query URL: {}",
e.to_string()
))
})?)
.body(data)
.header(header::CONTENT_TYPE, "application/json");

let response = request.send().await?;
Expand All @@ -57,12 +76,11 @@ impl GraphNodeInstance {

pub async fn network_query_raw(
&self,
endpoint: Url,
body: String,
) -> Result<UnattestedQueryResult, reqwest::Error> {
let request = self
.client
.post(endpoint)
.post(Url::clone(&self.network_subgraph_url))
.body(body.clone())
.header(header::CONTENT_TYPE, "application/json");

Expand All @@ -78,14 +96,12 @@ impl GraphNodeInstance {

pub async fn network_query(
&self,
endpoint: Url,
query: String,
variables: Option<Value>,
) -> Result<UnattestedQueryResult, reqwest::Error> {
let body = GraphQLQuery { query, variables };

self.network_query_raw(
endpoint,
serde_json::to_string(&body).expect("serialize network GraphQL query"),
)
.await
Expand Down
9 changes: 6 additions & 3 deletions service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,16 @@ async fn main() -> Result<(), std::io::Error> {
let config = Cli::args();
let release = package_version().expect("Failed to resolve for release version");

// Proper initiation of server, query processor
// server health check, graph-node instance connection check
let query_processor = QueryProcessor::new(
// Initialize graph-node client
let graph_node = graph_node::GraphNodeInstance::new(
&config.indexer_infrastructure.graph_node_query_endpoint,
&config.network_subgraph.network_subgraph_endpoint,
);

// Proper initiation of server, query processor
// server health check, graph-node instance connection check
let query_processor = QueryProcessor::new(graph_node.clone());

// Start indexer service basic metrics
tokio::spawn(handle_serve_metrics(
String::from("0.0.0.0"),
Expand Down
17 changes: 2 additions & 15 deletions service/src/query_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use ethers_core::types::Address;
use ethers_core::types::{Signature, U256};
use log::error;
use native::attestation::AttestationSigner;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use tap_core::tap_manager::SignedReceipt;

Expand Down Expand Up @@ -148,23 +147,14 @@ pub enum QueryError {

#[derive(Debug, Clone)]
pub struct QueryProcessor {
client: Client,
base: Url,
graph_node: GraphNodeInstance,
network_subgraph: Url,
signers: HashMap<Address, AttestationSigner>,
}

impl QueryProcessor {
pub fn new(graph_node_endpoint: &str, network_subgraph_endpoint: &str) -> QueryProcessor {
let graph_node = GraphNodeInstance::new(graph_node_endpoint);

pub fn new(graph_node: GraphNodeInstance) -> QueryProcessor {
QueryProcessor {
client: Client::new(),
base: Url::parse(graph_node_endpoint).expect("Could not parse graph node endpoint"),
graph_node,
network_subgraph: Url::parse(network_subgraph_endpoint)
.expect("Could not parse graph node endpoint"),
// TODO: populate signers
signers: HashMap::new(),
}
Expand All @@ -189,10 +179,7 @@ impl QueryProcessor {
&self,
query: String,
) -> Result<Response<UnattestedQueryResult>, QueryError> {
let response = self
.graph_node
.network_query_raw(self.network_subgraph.clone(), query)
.await?;
let response = self.graph_node.network_query_raw(query).await?;

Ok(Response {
result: response,
Expand Down

0 comments on commit 2b87043

Please sign in to comment.