From 7cacbdb79760f3a3058f9d07e8bfbc18afe1bc18 Mon Sep 17 00:00:00 2001 From: Filippo Neysofu Costa Date: Mon, 31 Jul 2023 16:52:17 +0200 Subject: [PATCH] Allow multiple network subgraphs and queries (#74) * Allow multiple network subgraphs and queries * Default network subgraph query * Add docstring to network subgraph config --- backend/crates/common/src/config.rs | 88 ++++++++++++------- .../crates/common/src/indexer/real_indexer.rs | 3 + backend/crates/common/src/network_subgraph.rs | 76 ++++++++++++++-- backend/crates/cross-checker/src/main.rs | 30 ++++++- .../{testnet-indexer.yml => network.yml} | 15 +++- .../{testnet-indexer.yml => network.yml} | 2 +- 6 files changed, 169 insertions(+), 45 deletions(-) rename ops/compose/graphix/{testnet-indexer.yml => network.yml} (61%) rename ops/compose/{testnet-indexer.yml => network.yml} (97%) diff --git a/backend/crates/common/src/config.rs b/backend/crates/common/src/config.rs index 8ef969b..17b89b8 100644 --- a/backend/crates/common/src/config.rs +++ b/backend/crates/common/src/config.rs @@ -9,10 +9,13 @@ use crate::{ network_subgraph::NetworkSubgraph, }; +/// A [`serde`]-compatible representation of Graphix's YAML configuration file. #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Config { pub database_url: String, + #[serde(default = "Config::default_prometheus_port")] + pub prometheus_port: u16, pub sources: Vec, #[serde(default)] pub block_choice_policy: BlockChoicePolicy, @@ -24,24 +27,8 @@ pub struct Config { impl Config { pub fn read(path: &Path) -> anyhow::Result { let file = File::open(path)?; - let config: Self = serde_yaml::from_reader(file) - .map_err(|e| anyhow::Error::new(e).context("invalid config file"))?; - - let num_network_subgraph_sources = config - .sources - .iter() - .filter(|c| match c { - ConfigSource::NetworkSubgraph(_) => true, - _ => false, - }) - .count(); - - // Validation: there can only be one network subgraph source, at most. - if num_network_subgraph_sources > 1 { - anyhow::bail!("there can only be one network subgraph source"); - } - - Ok(config) + Ok(serde_yaml::from_reader(file) + .map_err(|e| anyhow::Error::new(e).context("invalid config file"))?) } pub fn indexers(&self) -> Vec { @@ -77,26 +64,24 @@ impl Config { .collect() } - pub fn network_subgraph(&self) -> Option { - let network_subgraphs: Vec = self - .sources + pub fn network_subgraphs(&self) -> Vec { + self.sources .iter() .filter_map(|source| match source { ConfigSource::NetworkSubgraph(config) => Some(config), _ => None, }) .cloned() - .collect(); - - // This was already checked by [`Config::read`], it's just some - // defensive programming. - debug_assert!(network_subgraphs.len() <= 1); - network_subgraphs.into_iter().next() + .collect() } fn default_polling_period_in_seconds() -> u64 { 120 } + + fn default_prometheus_port() -> u16 { + 9184 + } } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd)] @@ -124,10 +109,27 @@ pub struct IndexerByAddressConfig { #[serde(rename_all = "camelCase")] pub struct NetworkSubgraphConfig { pub endpoint: String, + /// What query out of several available ones to use to fetch the list of + /// indexers from the network subgraph? + #[serde(default)] + pub query: NetworkSubgraphQuery, pub stake_threshold: f64, pub limit: Option, } +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum NetworkSubgraphQuery { + ByAllocations, + ByStakedTokens, +} + +impl Default for NetworkSubgraphQuery { + fn default() -> Self { + NetworkSubgraphQuery::ByAllocations + } +} + #[derive(Clone, Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct InterceptorConfig { @@ -173,26 +175,44 @@ pub async fn config_to_indexers(config: Config) -> anyhow::Result { + network_subgraph.indexers_by_allocations().await? + } + NetworkSubgraphQuery::ByStakedTokens => { + network_subgraph.indexers_by_staked_tokens().await? + } + }; if let Some(limit) = config.limit { network_subgraph_indexers.truncate(limit as usize); } - info!("Configuring network subgraph"); indexers.extend(network_subgraph_indexers); } - // Then, configure indexers by address, which requires access to the network subgraph. + info!( + indexer_count = indexers.len(), + "Configured all network subgraphs" + ); + + // Then, configure indexers by address, which requires access to a network subgraph. for indexer_config in config.indexers_by_address() { + // FIXME: when looking up indexers by address, we don't really know + // which network subgraph to use for the lookup. Should this be + // indicated inside the data source's configuration? Should we try all + // network subgraphs until one succeeds? let network_subgraph = NetworkSubgraph::new( config - .network_subgraph() + .network_subgraphs() + .get(0) .ok_or_else(|| anyhow::anyhow!("indexer by address requires a network subgraph"))? - .endpoint, + .endpoint + .clone(), ); let indexer = network_subgraph .indexer_by_address(&indexer_config.address) diff --git a/backend/crates/common/src/indexer/real_indexer.rs b/backend/crates/common/src/indexer/real_indexer.rs index 71b021b..fd2486b 100644 --- a/backend/crates/common/src/indexer/real_indexer.rs +++ b/backend/crates/common/src/indexer/real_indexer.rs @@ -15,6 +15,8 @@ use crate::{ types::{BlockPointer, IndexingStatus, PoiRequest, ProofOfIndexing, SubgraphDeployment}, }; +const REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + #[derive(Debug)] pub struct RealIndexer { id: String, // Assumed to be unique accross all indexers @@ -57,6 +59,7 @@ impl RealIndexer { let response_raw = self .client .post(self.urls.status.clone()) + .timeout(REQUEST_TIMEOUT) .json(&request) .send() .await?; diff --git a/backend/crates/common/src/network_subgraph.rs b/backend/crates/common/src/network_subgraph.rs index fd299aa..7f4b025 100644 --- a/backend/crates/common/src/network_subgraph.rs +++ b/backend/crates/common/src/network_subgraph.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] +use anyhow::anyhow; use reqwest::Url; use serde_derive::{Deserialize, Serialize}; use std::{collections::BTreeMap, sync::Arc}; @@ -17,6 +18,15 @@ pub struct NetworkSubgraph { } impl NetworkSubgraph { + const INDEXERS_BY_STAKED_TOKENS_QUERY: &str = r#" + { + indexers(orderBy: stakedTokens) { + id + url + } + } + "#; + const DEPLOYMENTS_QUERY: &str = r#" { subgraphDeployments(where: { indexerAllocations_: { status_in: [Active] }}, orderBy:stakedTokens) { @@ -47,7 +57,54 @@ impl NetworkSubgraph { } } - pub async fn indexers(&self) -> anyhow::Result>> { + pub async fn indexers_by_staked_tokens(&self) -> anyhow::Result>> { + let request = GraphqlRequest { + query: Self::INDEXERS_BY_STAKED_TOKENS_QUERY.to_string(), + variables: BTreeMap::new(), // Our query doesn't require any variables. + }; + + let res: GraphqlResponse = self + .client + .post(&self.endpoint) + .json(&request) + .send() + .await? + .error_for_status()? + .json() + .await?; + + let errors = res.errors.unwrap_or_default(); + if !errors.is_empty() { + return Err(anyhow::anyhow!( + "error(s) querying top indexers from the network subgraph: {}", + serde_json::to_string(&errors)? + )); + } + + // Unwrap: A response that has no errors must contain data. + let data = res.data.unwrap(); + let data_deserialized: GraphqlResponseTopIndexers = serde_json::from_value(data)?; + + let mut indexers: Vec> = vec![]; + for indexer in data_deserialized.indexers { + let indexer_id = indexer.id.clone(); + let real_indexer = + indexer_allocation_data_to_real_indexer(IndexerAllocation { indexer }); + + match real_indexer { + Ok(indexer) => indexers.push(Arc::new(indexer)), + Err(e) => warn!( + err = %e.to_string(), + indexer_id, + "Received bad indexer for network subgraph query; ignoring", + ), + } + } + + Ok(indexers) + } + + pub async fn indexers_by_allocations(&self) -> anyhow::Result>> { let sg_deployments = self.subgraph_deployments().await?; let mut indexers: Vec> = vec![]; @@ -153,7 +210,7 @@ impl NetworkSubgraph { // Unwrap: A response that has no errors must contain data. let data = res.data.unwrap(); - let data_deserialized: GraphqlResponseData = serde_json::from_value(data)?; + let data_deserialized: GraphqlResponseSgDeployments = serde_json::from_value(data)?; //let page: Vec = page // .into_iter() // .map(|raw_deployment| SubgraphDeployment { @@ -177,7 +234,10 @@ fn indexer_allocation_data_to_real_indexer( indexer_allocation: IndexerAllocation, ) -> anyhow::Result { let indexer = indexer_allocation.indexer; - let mut url: Url = indexer.url.parse()?; + let mut url: Url = indexer + .url + .ok_or_else(|| anyhow!("Indexer without URL"))? + .parse()?; // FIXME: we're unable to connect to indexers over HTTPS inside // docker-compose for now. url.set_scheme("http") @@ -204,10 +264,16 @@ struct GraphqlResponse { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct GraphqlResponseData { +struct GraphqlResponseSgDeployments { subgraph_deployments: Vec, } +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct GraphqlResponseTopIndexers { + indexers: Vec, +} + #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct SubgraphDeployment { @@ -223,7 +289,7 @@ pub struct IndexerAllocation { #[derive(Debug, Deserialize)] pub struct Indexer { pub id: String, - pub url: String, + pub url: Option, } impl NetworkSubgraph {} diff --git a/backend/crates/cross-checker/src/main.rs b/backend/crates/cross-checker/src/main.rs index 68483c5..343253e 100644 --- a/backend/crates/cross-checker/src/main.rs +++ b/backend/crates/cross-checker/src/main.rs @@ -11,6 +11,7 @@ use graphix_common::queries::{query_indexing_statuses, query_proofs_of_indexing} use graphix_common::PrometheusExporter; use graphix_common::{config, store}; use prometheus_exporter::prometheus; +use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -27,17 +28,20 @@ async fn main() -> anyhow::Result<()> { info!("Parse options"); let cli_options = CliOptions::parse(); - info!("Load configuration file"); + info!("Loading configuration file"); let config = Config::read(&cli_options.config)?; + info!("Initialize store and running migrations"); let store = store::Store::new(&config.database_url).await?; + info!("Store initialization successful"); let sleep_duration = Duration::from_secs(config.polling_period_in_seconds); // Prometheus metrics. let registry = prometheus::default_registry().clone(); - let _exporter = PrometheusExporter::start(9184, registry.clone()).unwrap(); + let _exporter = PrometheusExporter::start(config.prometheus_port, registry.clone()).unwrap(); + info!("Initializing bisect request handler"); let store_clone = store.clone(); let (tx_indexers, rx_indexers) = watch::channel(vec![]); tokio::spawn(async move { @@ -48,9 +52,13 @@ async fn main() -> anyhow::Result<()> { loop { info!("New main loop iteration"); - info!("Initialize inputs (indexers, indexing statuses etc.)"); - let indexers = config::config_to_indexers(config.clone()).await?; + + let mut indexers = config::config_to_indexers(config.clone()).await?; + // Different data sources, especially network subgraphs, result in + // duplicate indexers. + indexers = deduplicate_indexers(&indexers); + tx_indexers.send(indexers.clone())?; let indexing_statuses = query_indexing_statuses(indexers).await; @@ -58,6 +66,8 @@ async fn main() -> anyhow::Result<()> { info!("Monitor proofs of indexing"); let pois = query_proofs_of_indexing(indexing_statuses, config.block_choice_policy).await; + info!(pois = pois.len(), "Finished tracking PoIs"); + let write_err = store.write_pois(&pois, store::PoiLiveness::Live).err(); if let Some(err) = write_err { error!(error = %err, "Failed to write POIs to database"); @@ -75,6 +85,18 @@ fn init_tracing() { tracing_subscriber::fmt::init(); } +fn deduplicate_indexers(indexers: &[Arc]) -> Vec> { + let mut seen = HashSet::new(); + let mut deduplicated = vec![]; + for indexer in indexers { + if !seen.contains(indexer.id()) { + deduplicated.push(indexer.clone()); + seen.insert(indexer.id().to_string()); + } + } + deduplicated +} + #[derive(Parser, Debug)] struct CliOptions { #[clap(long)] diff --git a/ops/compose/graphix/testnet-indexer.yml b/ops/compose/graphix/network.yml similarity index 61% rename from ops/compose/graphix/testnet-indexer.yml rename to ops/compose/graphix/network.yml index c81c78b..a22f14b 100644 --- a/ops/compose/graphix/testnet-indexer.yml +++ b/ops/compose/graphix/network.yml @@ -1,3 +1,5 @@ +# A good mix of data sources for local development, both mainnet and testnet. + databaseUrl: postgres://graphix:password@postgres-graphix:5432/graphix sources: - type: indexer @@ -10,8 +12,19 @@ sources: status: http://graph-testnet.ellipfra.net/status - type: networkSubgraph endpoint: https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli + query: byAllocations + stakeThreshold: 0.0 + limit: 20 # Disabled for now + - type: networkSubgraph + endpoint: https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-arbitrum + query: byAllocations + stakeThreshold: 0.0 + limit: 20 + - type: networkSubgraph + endpoint: https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-arbitrum + query: byStakedTokens stakeThreshold: 0.0 - limit: 0 # Disabled for now + limit: 40 # - type: indexerByAddress # address: "0x56577167dcdd1a3de2e58d53fc2be0b622d82a7c" # Example interceptors diff --git a/ops/compose/testnet-indexer.yml b/ops/compose/network.yml similarity index 97% rename from ops/compose/testnet-indexer.yml rename to ops/compose/network.yml index 7c01815..a099633 100644 --- a/ops/compose/testnet-indexer.yml +++ b/ops/compose/network.yml @@ -48,7 +48,7 @@ services: - "9184:9184" volumes: - ./graphix/:/config/ - command: ["--config", "/config/testnet-indexer.yml"] + command: ["--config", "/config/network.yml"] graphix-api-server: image: edgeandnode/graphix-api-server