Skip to content

Commit

Permalink
Allow multiple network subgraphs and queries (#74)
Browse files Browse the repository at this point in the history
* Allow multiple network subgraphs and queries

* Default network subgraph query

* Add docstring to network subgraph config
  • Loading branch information
neysofu authored Jul 31, 2023
1 parent 209ce36 commit 7cacbdb
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 45 deletions.
88 changes: 54 additions & 34 deletions backend/crates/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ use crate::{
network_subgraph::NetworkSubgraph,
};

/// A [`serde`]-compatible representation of Graphix's YAML configuration file.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Config {
pub database_url: String,
#[serde(default = "Config::default_prometheus_port")]
pub prometheus_port: u16,
pub sources: Vec<ConfigSource>,
#[serde(default)]
pub block_choice_policy: BlockChoicePolicy,
Expand All @@ -24,24 +27,8 @@ pub struct Config {
impl Config {
pub fn read(path: &Path) -> anyhow::Result<Self> {
let file = File::open(path)?;
let config: Self = serde_yaml::from_reader(file)
.map_err(|e| anyhow::Error::new(e).context("invalid config file"))?;

let num_network_subgraph_sources = config
.sources
.iter()
.filter(|c| match c {
ConfigSource::NetworkSubgraph(_) => true,
_ => false,
})
.count();

// Validation: there can only be one network subgraph source, at most.
if num_network_subgraph_sources > 1 {
anyhow::bail!("there can only be one network subgraph source");
}

Ok(config)
Ok(serde_yaml::from_reader(file)
.map_err(|e| anyhow::Error::new(e).context("invalid config file"))?)
}

pub fn indexers(&self) -> Vec<IndexerConfig> {
Expand Down Expand Up @@ -77,26 +64,24 @@ impl Config {
.collect()
}

pub fn network_subgraph(&self) -> Option<NetworkSubgraphConfig> {
let network_subgraphs: Vec<NetworkSubgraphConfig> = self
.sources
pub fn network_subgraphs(&self) -> Vec<NetworkSubgraphConfig> {
self.sources
.iter()
.filter_map(|source| match source {
ConfigSource::NetworkSubgraph(config) => Some(config),
_ => None,
})
.cloned()
.collect();

// This was already checked by [`Config::read`], it's just some
// defensive programming.
debug_assert!(network_subgraphs.len() <= 1);
network_subgraphs.into_iter().next()
.collect()
}

fn default_polling_period_in_seconds() -> u64 {
120
}

fn default_prometheus_port() -> u16 {
9184
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd)]
Expand Down Expand Up @@ -124,10 +109,27 @@ pub struct IndexerByAddressConfig {
#[serde(rename_all = "camelCase")]
pub struct NetworkSubgraphConfig {
pub endpoint: String,
/// What query out of several available ones to use to fetch the list of
/// indexers from the network subgraph?
#[serde(default)]
pub query: NetworkSubgraphQuery,
pub stake_threshold: f64,
pub limit: Option<u32>,
}

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum NetworkSubgraphQuery {
ByAllocations,
ByStakedTokens,
}

impl Default for NetworkSubgraphQuery {
fn default() -> Self {
NetworkSubgraphQuery::ByAllocations
}
}

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InterceptorConfig {
Expand Down Expand Up @@ -173,26 +175,44 @@ pub async fn config_to_indexers(config: Config) -> anyhow::Result<Vec<Arc<dyn In
indexers.push(Arc::new(RealIndexer::new(config.clone())));
}

// Then, configure the network subgraph, if required, resulting in "dynamic"
// Then, configure the network subgraphs, if required, resulting in "dynamic"
// indexers.
if let Some(config) = config.network_subgraph() {
for config in config.network_subgraphs() {
info!(endpoint = %config.endpoint, "Configuring network subgraph");
let network_subgraph = NetworkSubgraph::new(config.endpoint);
let mut network_subgraph_indexers = network_subgraph.indexers().await?;
let mut network_subgraph_indexers = match config.query {
NetworkSubgraphQuery::ByAllocations => {
network_subgraph.indexers_by_allocations().await?
}
NetworkSubgraphQuery::ByStakedTokens => {
network_subgraph.indexers_by_staked_tokens().await?
}
};
if let Some(limit) = config.limit {
network_subgraph_indexers.truncate(limit as usize);
}

info!("Configuring network subgraph");
indexers.extend(network_subgraph_indexers);
}

// Then, configure indexers by address, which requires access to the network subgraph.
info!(
indexer_count = indexers.len(),
"Configured all network subgraphs"
);

// Then, configure indexers by address, which requires access to a network subgraph.
for indexer_config in config.indexers_by_address() {
// FIXME: when looking up indexers by address, we don't really know
// which network subgraph to use for the lookup. Should this be
// indicated inside the data source's configuration? Should we try all
// network subgraphs until one succeeds?
let network_subgraph = NetworkSubgraph::new(
config
.network_subgraph()
.network_subgraphs()
.get(0)
.ok_or_else(|| anyhow::anyhow!("indexer by address requires a network subgraph"))?
.endpoint,
.endpoint
.clone(),
);
let indexer = network_subgraph
.indexer_by_address(&indexer_config.address)
Expand Down
3 changes: 3 additions & 0 deletions backend/crates/common/src/indexer/real_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::{
types::{BlockPointer, IndexingStatus, PoiRequest, ProofOfIndexing, SubgraphDeployment},
};

const REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);

#[derive(Debug)]
pub struct RealIndexer {
id: String, // Assumed to be unique accross all indexers
Expand Down Expand Up @@ -57,6 +59,7 @@ impl RealIndexer {
let response_raw = self
.client
.post(self.urls.status.clone())
.timeout(REQUEST_TIMEOUT)
.json(&request)
.send()
.await?;
Expand Down
76 changes: 71 additions & 5 deletions backend/crates/common/src/network_subgraph.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code)]

use anyhow::anyhow;
use reqwest::Url;
use serde_derive::{Deserialize, Serialize};
use std::{collections::BTreeMap, sync::Arc};
Expand All @@ -17,6 +18,15 @@ pub struct NetworkSubgraph {
}

impl NetworkSubgraph {
const INDEXERS_BY_STAKED_TOKENS_QUERY: &str = r#"
{
indexers(orderBy: stakedTokens) {
id
url
}
}
"#;

const DEPLOYMENTS_QUERY: &str = r#"
{
subgraphDeployments(where: { indexerAllocations_: { status_in: [Active] }}, orderBy:stakedTokens) {
Expand Down Expand Up @@ -47,7 +57,54 @@ impl NetworkSubgraph {
}
}

pub async fn indexers(&self) -> anyhow::Result<Vec<Arc<dyn IndexerTrait>>> {
pub async fn indexers_by_staked_tokens(&self) -> anyhow::Result<Vec<Arc<dyn IndexerTrait>>> {
let request = GraphqlRequest {
query: Self::INDEXERS_BY_STAKED_TOKENS_QUERY.to_string(),
variables: BTreeMap::new(), // Our query doesn't require any variables.
};

let res: GraphqlResponse = self
.client
.post(&self.endpoint)
.json(&request)
.send()
.await?
.error_for_status()?
.json()
.await?;

let errors = res.errors.unwrap_or_default();
if !errors.is_empty() {
return Err(anyhow::anyhow!(
"error(s) querying top indexers from the network subgraph: {}",
serde_json::to_string(&errors)?
));
}

// Unwrap: A response that has no errors must contain data.
let data = res.data.unwrap();
let data_deserialized: GraphqlResponseTopIndexers = serde_json::from_value(data)?;

let mut indexers: Vec<Arc<dyn IndexerTrait>> = vec![];
for indexer in data_deserialized.indexers {
let indexer_id = indexer.id.clone();
let real_indexer =
indexer_allocation_data_to_real_indexer(IndexerAllocation { indexer });

match real_indexer {
Ok(indexer) => indexers.push(Arc::new(indexer)),
Err(e) => warn!(
err = %e.to_string(),
indexer_id,
"Received bad indexer for network subgraph query; ignoring",
),
}
}

Ok(indexers)
}

pub async fn indexers_by_allocations(&self) -> anyhow::Result<Vec<Arc<dyn IndexerTrait>>> {
let sg_deployments = self.subgraph_deployments().await?;

let mut indexers: Vec<Arc<dyn IndexerTrait>> = vec![];
Expand Down Expand Up @@ -153,7 +210,7 @@ impl NetworkSubgraph {

// Unwrap: A response that has no errors must contain data.
let data = res.data.unwrap();
let data_deserialized: GraphqlResponseData = serde_json::from_value(data)?;
let data_deserialized: GraphqlResponseSgDeployments = serde_json::from_value(data)?;
//let page: Vec<SubgraphDeployment> = page
// .into_iter()
// .map(|raw_deployment| SubgraphDeployment {
Expand All @@ -177,7 +234,10 @@ fn indexer_allocation_data_to_real_indexer(
indexer_allocation: IndexerAllocation,
) -> anyhow::Result<RealIndexer> {
let indexer = indexer_allocation.indexer;
let mut url: Url = indexer.url.parse()?;
let mut url: Url = indexer
.url
.ok_or_else(|| anyhow!("Indexer without URL"))?
.parse()?;
// FIXME: we're unable to connect to indexers over HTTPS inside
// docker-compose for now.
url.set_scheme("http")
Expand All @@ -204,10 +264,16 @@ struct GraphqlResponse {

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphqlResponseData {
struct GraphqlResponseSgDeployments {
subgraph_deployments: Vec<SubgraphDeployment>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphqlResponseTopIndexers {
indexers: Vec<Indexer>,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SubgraphDeployment {
Expand All @@ -223,7 +289,7 @@ pub struct IndexerAllocation {
#[derive(Debug, Deserialize)]
pub struct Indexer {
pub id: String,
pub url: String,
pub url: Option<String>,
}

impl NetworkSubgraph {}
Expand Down
30 changes: 26 additions & 4 deletions backend/crates/cross-checker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use graphix_common::queries::{query_indexing_statuses, query_proofs_of_indexing}
use graphix_common::PrometheusExporter;
use graphix_common::{config, store};
use prometheus_exporter::prometheus;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -27,17 +28,20 @@ async fn main() -> anyhow::Result<()> {
info!("Parse options");
let cli_options = CliOptions::parse();

info!("Load configuration file");
info!("Loading configuration file");
let config = Config::read(&cli_options.config)?;

info!("Initialize store and running migrations");
let store = store::Store::new(&config.database_url).await?;
info!("Store initialization successful");

let sleep_duration = Duration::from_secs(config.polling_period_in_seconds);

// Prometheus metrics.
let registry = prometheus::default_registry().clone();
let _exporter = PrometheusExporter::start(9184, registry.clone()).unwrap();
let _exporter = PrometheusExporter::start(config.prometheus_port, registry.clone()).unwrap();

info!("Initializing bisect request handler");
let store_clone = store.clone();
let (tx_indexers, rx_indexers) = watch::channel(vec![]);
tokio::spawn(async move {
Expand All @@ -48,16 +52,22 @@ async fn main() -> anyhow::Result<()> {

loop {
info!("New main loop iteration");

info!("Initialize inputs (indexers, indexing statuses etc.)");
let indexers = config::config_to_indexers(config.clone()).await?;

let mut indexers = config::config_to_indexers(config.clone()).await?;
// Different data sources, especially network subgraphs, result in
// duplicate indexers.
indexers = deduplicate_indexers(&indexers);

tx_indexers.send(indexers.clone())?;

let indexing_statuses = query_indexing_statuses(indexers).await;

info!("Monitor proofs of indexing");
let pois = query_proofs_of_indexing(indexing_statuses, config.block_choice_policy).await;

info!(pois = pois.len(), "Finished tracking PoIs");

let write_err = store.write_pois(&pois, store::PoiLiveness::Live).err();
if let Some(err) = write_err {
error!(error = %err, "Failed to write POIs to database");
Expand All @@ -75,6 +85,18 @@ fn init_tracing() {
tracing_subscriber::fmt::init();
}

fn deduplicate_indexers(indexers: &[Arc<dyn Indexer>]) -> Vec<Arc<dyn Indexer>> {
let mut seen = HashSet::new();
let mut deduplicated = vec![];
for indexer in indexers {
if !seen.contains(indexer.id()) {
deduplicated.push(indexer.clone());
seen.insert(indexer.id().to_string());
}
}
deduplicated
}

#[derive(Parser, Debug)]
struct CliOptions {
#[clap(long)]
Expand Down
Loading

0 comments on commit 7cacbdb

Please sign in to comment.