diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b718ca..ab80c8d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,6 +13,7 @@ on: env: CARGO_TERM_COLOR: always RUST_BACKTRACE: full + REGISTRY: ghcr.io jobs: rustfmt: @@ -31,48 +32,44 @@ jobs: toolchain: ${{ matrix.rust }} components: rustfmt override: true - - name: Check formating + - name: Check formatting uses: actions-rs/cargo@v1 with: command: fmt args: --all -- --check - - docker-api-server: - runs-on: ubuntu-latest/Users/rotarur/projects/edgeandnode/graph-infra/.github/workflows/template-deploy.yaml + docker-build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + if: github.ref == 'refs/heads/main' steps: + - name: Checkout repository + uses: actions/checkout@v3 - name: Set up QEMU uses: docker/setup-qemu-action@v2 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - #- name: Login to Docker Hub - # uses: docker/login-action@v2 - # with: - # username: ${{ secrets.DOCKERHUB_USERNAME }} - # password: ${{ secrets.DOCKERHUB_TOKEN }} - - name: docker build + - name: Log in to the Container registry + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Build and push API server Docker image uses: docker/build-push-action@v4 with: - push: false - tags: edgeandnode/graphix-api-server:latest + context: . file: ops/api-server.dockerfile - docker-cross-checker: - runs-on: ubuntu-latest - steps: - - name: Set up QEMU - uses: docker/setup-qemu-action@v2 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 - #- name: Login to Docker Hub - # uses: docker/login-action@v2 - # with: - # username: ${{ secrets.DOCKERHUB_USERNAME }} - # password: ${{ secrets.DOCKERHUB_TOKEN }} - - name: docker build + push: true + tags: ghcr.io/${{ github.repository_owner }}/graphix-api-server:latest + - name: Build and push cross-checker service Docker image uses: docker/build-push-action@v4 with: - push: false - tags: edgeandnode/graphix-cross-checker:latest - file: ops/cross-checker.dockerfile + context: . + file: ops/api-server.dockerfile + push: true + tags: ghcr.io/${{ github.repository_owner }}/graphix-cross-checker:latest build: name: Build diff --git a/.github/workflows/template-deploy.yaml b/.github/workflows/template-deploy.yaml index 67909b5..1e63996 100644 --- a/.github/workflows/template-deploy.yaml +++ b/.github/workflows/template-deploy.yaml @@ -63,7 +63,7 @@ jobs: - name: Diff ${{ inputs.ENVIRONMENT }} if: inputs.ENABLE_DIFF working-directory: k8s - run: kubectl diff -k ${{ inputs.ENVIRONMENT }} + run: kubectl diff -k ${{ inputs.ENVIRONMENT }} | true - name: Deploy to ${{ inputs.ENVIRONMENT }} cluster if: inputs.ENABLE_APPLY diff --git a/backend/crates/api-server/src/main.rs b/backend/crates/api-server/src/main.rs index ac324c3..aa99a99 100644 --- a/backend/crates/api-server/src/main.rs +++ b/backend/crates/api-server/src/main.rs @@ -5,7 +5,7 @@ use async_graphql::{ use async_graphql_warp::{self, GraphQLResponse}; use clap::Parser; use graphix_common::{api_types as schema, store::Store}; -use std::{convert::Infallible, net::Ipv4Addr}; +use std::convert::Infallible; use warp::{ http::{self, Method}, Filter, @@ -59,9 +59,7 @@ async fn run_api_server(options: CliOptions, store: Store) { .or(graphql_playground_route) .or(graphql_route); - warp::serve(routes) - .run((Ipv4Addr::LOCALHOST, options.port)) - .await; + warp::serve(routes).run(([0, 0, 0, 0], options.port)).await; } fn init_tracing() { diff --git a/backend/crates/common/graphql/indexer/queries/indexing-statuses.gql b/backend/crates/common/graphql/indexer/queries/indexing-statuses.gql index c3724bd..adcb688 100644 --- a/backend/crates/common/graphql/indexer/queries/indexing-statuses.gql +++ b/backend/crates/common/graphql/indexer/queries/indexing-statuses.gql @@ -9,6 +9,9 @@ query IndexingStatuses { number hash } + earliestBlock { + number + } } } } diff --git a/backend/crates/common/src/block_choice.rs b/backend/crates/common/src/block_choice.rs new file mode 100644 index 0000000..47faba5 --- /dev/null +++ b/backend/crates/common/src/block_choice.rs @@ -0,0 +1,61 @@ +use crate::prelude::IndexingStatus; +use serde::Deserialize; + +#[derive(Copy, Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BlockChoicePolicy { + // Use the earliest block that all indexers have in common + Earliest, + // Use the block that maximizes the total number of blocks synced across all indexers + MaxSyncedBlocks, +} + +impl Default for BlockChoicePolicy { + fn default() -> Self { + BlockChoicePolicy::MaxSyncedBlocks + } +} + +impl BlockChoicePolicy { + pub fn choose_block<'a>( + &self, + statuses: impl Iterator, + ) -> Option { + match self { + BlockChoicePolicy::Earliest => statuses + .map(|status| &status.latest_block.number) + .min() + .copied(), + BlockChoicePolicy::MaxSyncedBlocks => { + // Assuming that all statuses have the same `deployment` and `earliest_block_num`, + // this will return the block number that maximizes the total number of blocks + // synced across all indexers. + + let mut indexers_ascending: Vec<&'a IndexingStatus> = statuses.collect(); + indexers_ascending.sort_by_key(|status| status.latest_block.number); + + let mut max_utility = 0; + let mut best_block: Option = None; + + for (i, status) in indexers_ascending.iter().enumerate() { + let remaining_statuses = indexers_ascending.len() - i; + let block_number = status.latest_block.number; + if block_number < status.earliest_block_num { + // This status is inconsistent, ignore it, avoiding overflow. + continue; + } + + let utility = + remaining_statuses as u64 * (block_number - status.earliest_block_num); + + if utility > max_utility { + max_utility = utility; + best_block = Some(block_number); + } + } + + best_block + } + } + } +} diff --git a/backend/crates/common/src/config.rs b/backend/crates/common/src/config.rs index f2eea78..8ef969b 100644 --- a/backend/crates/common/src/config.rs +++ b/backend/crates/common/src/config.rs @@ -4,6 +4,7 @@ use std::{fs::File, path::Path, sync::Arc}; use tracing::info; use crate::{ + block_choice::BlockChoicePolicy, indexer::{Indexer, IndexerInterceptor, RealIndexer}, network_subgraph::NetworkSubgraph, }; @@ -13,6 +14,9 @@ use crate::{ pub struct Config { pub database_url: String, pub sources: Vec, + #[serde(default)] + pub block_choice_policy: BlockChoicePolicy, + #[serde(default = "Config::default_polling_period_in_seconds")] pub polling_period_in_seconds: u64, } diff --git a/backend/crates/common/src/indexer/interceptor.rs b/backend/crates/common/src/indexer/interceptor.rs index cce5bdf..207880b 100644 --- a/backend/crates/common/src/indexer/interceptor.rs +++ b/backend/crates/common/src/indexer/interceptor.rs @@ -50,6 +50,7 @@ impl Indexer for IndexerInterceptor { deployment: status.deployment, network: status.network, latest_block: status.latest_block, + earliest_block_num: status.earliest_block_num, }) .collect(); Ok(hijacked_statuses) diff --git a/backend/crates/common/src/indexer/real_indexer.rs b/backend/crates/common/src/indexer/real_indexer.rs index b4135ef..71b021b 100644 --- a/backend/crates/common/src/indexer/real_indexer.rs +++ b/backend/crates/common/src/indexer/real_indexer.rs @@ -148,9 +148,13 @@ impl Indexer for RealIndexer { ) -> Vec { let mut pois = vec![]; - // Graph Node implements a limit of 10 POI requests per request, so - // split our requests up accordingly. - for requests in requests.chunks(10) { + // Graph Node implements a limit of 10 POI requests per request, so split our requests up + // accordingly. + // + // FIXME: This is temporarily set to 1 until we fix the error: 'Null value resolved for + // non-null field `proofOfIndexing`' Which is probably a Graph Node bug. Setting it to 1 + // reduces the impact of this issue. + for requests in requests.chunks(1) { trace!( indexer = %self.id(), batch_size = requests.len(), @@ -302,18 +306,19 @@ mod gql_types { .get(0) .ok_or_else(|| anyhow!("chain status missing"))?; - let latest_block = match chain.on { + let (latest_block, earliest_block_num) = match &chain.on { indexing_statuses::IndexingStatusesIndexingStatusesChainsOn::EthereumIndexingStatus( indexing_statuses::IndexingStatusesIndexingStatusesChainsOnEthereumIndexingStatus { - ref latest_block, + latest_block, + earliest_block, .. }, - ) => match latest_block { - Some(block) => BlockPointer { + ) => match (latest_block, earliest_block) { + (Some(block), Some(earliest_block)) => (BlockPointer { number: block.number.parse()?, hash: Some(block.hash.clone().as_str().try_into()?), - }, - None => { + }, earliest_block.number.parse()?), + _ => { return Err(anyhow!("deployment has not started indexing yet")); } }, @@ -324,6 +329,7 @@ mod gql_types { deployment: SubgraphDeployment(self.inner.subgraph), network: chain.network.clone(), latest_block, + earliest_block_num, }) } } diff --git a/backend/crates/common/src/lib.rs b/backend/crates/common/src/lib.rs index 9dcc63b..87bdd42 100644 --- a/backend/crates/common/src/lib.rs +++ b/backend/crates/common/src/lib.rs @@ -1,4 +1,5 @@ pub mod api_types; +pub mod block_choice; pub mod config; mod indexer; pub mod network_subgraph; diff --git a/backend/crates/common/src/network_subgraph.rs b/backend/crates/common/src/network_subgraph.rs index 726e731..fd299aa 100644 --- a/backend/crates/common/src/network_subgraph.rs +++ b/backend/crates/common/src/network_subgraph.rs @@ -69,7 +69,6 @@ impl NetworkSubgraph { &self, address: &[u8], ) -> anyhow::Result> { - println!("address is {:?}", hex::encode(address)); let request = GraphqlRequest { query: Self::INDEXER_BY_ADDRESS_QUERY.to_string(), variables: BTreeMap::from_iter(vec![( diff --git a/backend/crates/common/src/queries.rs b/backend/crates/common/src/queries.rs index 5a4f5f8..fb42626 100644 --- a/backend/crates/common/src/queries.rs +++ b/backend/crates/common/src/queries.rs @@ -1,7 +1,6 @@ +use crate::block_choice::BlockChoicePolicy; use crate::indexer::Indexer; -use crate::prelude::{ - BlockPointer, IndexingStatus, PoiRequest, ProofOfIndexing, SubgraphDeployment, -}; +use crate::prelude::{IndexingStatus, PoiRequest, ProofOfIndexing, SubgraphDeployment}; use crate::prometheus_metrics::metrics; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -74,6 +73,7 @@ pub async fn query_indexing_statuses(indexers: Vec>) -> Vec, + block_choice_policy: BlockChoicePolicy, ) -> Vec { info!("Query POIs for recent common blocks across indexers"); @@ -102,17 +102,13 @@ pub async fn query_proofs_of_indexing( ) })); - // For each deployment, identify the latest block number that all indexers have in common - let latest_blocks: HashMap> = + // For each deployment, chooose a block on which to query the PoI + let latest_blocks: HashMap> = HashMap::from_iter(deployments.iter().map(|deployment| { ( deployment.clone(), statuses_by_deployment.get(deployment).and_then(|statuses| { - statuses - .iter() - .map(|status| &status.latest_block) - .min_by_key(|block| block.number) - .cloned() + block_choice_policy.choose_block(statuses.iter().map(|&s| s)) }), ) })); @@ -123,17 +119,20 @@ pub async fn query_proofs_of_indexing( .map(|indexer| async { let poi_requests = latest_blocks .iter() - .filter(|(deployment, _)| { + .filter(|(deployment, &block_number)| { statuses_by_deployment .get(*deployment) .expect("bug in matching deployments to latest blocks and indexers") .iter() - .any(|status| status.indexer.eq(indexer)) + .any(|status| { + status.indexer.eq(indexer) + && Some(status.latest_block.number) >= block_number + }) }) - .filter_map(|(deployment, block)| { - block.clone().map(|block| PoiRequest { + .filter_map(|(deployment, block_number)| { + block_number.map(|block_number| PoiRequest { deployment: deployment.clone(), - block_number: block.number, + block_number: block_number, }) }) .collect::>(); diff --git a/backend/crates/common/src/store/tests.rs b/backend/crates/common/src/store/tests.rs index 65ca9af..337e56d 100644 --- a/backend/crates/common/src/store/tests.rs +++ b/backend/crates/common/src/store/tests.rs @@ -1,3 +1,4 @@ +use crate::block_choice::BlockChoicePolicy; use crate::tests::{fast_rng, gen::gen_indexers}; use crate::{ queries, @@ -16,7 +17,8 @@ async fn poi_db_roundtrip() { let indexers = gen_indexers(&mut rng, 100); let indexing_statuses = queries::query_indexing_statuses(indexers).await; - let pois = queries::query_proofs_of_indexing(indexing_statuses).await; + let pois = + queries::query_proofs_of_indexing(indexing_statuses, BlockChoicePolicy::Earliest).await; let store = Store::new(&test_db_url()).await.unwrap(); let mut conn = store.test_conn(); diff --git a/backend/crates/common/src/tests/gen.rs b/backend/crates/common/src/tests/gen.rs index 4bbf724..f90e7ac 100644 --- a/backend/crates/common/src/tests/gen.rs +++ b/backend/crates/common/src/tests/gen.rs @@ -87,6 +87,7 @@ where network: "mainnet".into(), latest_block: blocks.iter().choose(&mut rng).unwrap().clone(), canonical_pois: gen_pois(blocks.clone(), &mut rng), + earliest_block_num: blocks[0].number, }) .collect(); diff --git a/backend/crates/common/src/tests/mocks.rs b/backend/crates/common/src/tests/mocks.rs index ce572b4..278b971 100644 --- a/backend/crates/common/src/tests/mocks.rs +++ b/backend/crates/common/src/tests/mocks.rs @@ -13,6 +13,7 @@ pub struct DeploymentDetails { pub network: String, pub latest_block: BlockPointer, pub canonical_pois: Vec, + pub earliest_block_num: u64, } #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -45,6 +46,7 @@ impl Indexer for MockIndexer { deployment: details.deployment, network: details.network, latest_block: details.latest_block, + earliest_block_num: details.earliest_block_num, }) .collect()) } diff --git a/backend/crates/common/src/types.rs b/backend/crates/common/src/types.rs index c9573b2..b128e0f 100644 --- a/backend/crates/common/src/types.rs +++ b/backend/crates/common/src/types.rs @@ -39,6 +39,7 @@ pub struct IndexingStatus { pub deployment: SubgraphDeployment, pub network: String, pub latest_block: BlockPointer, + pub earliest_block_num: u64, } impl PartialEq for IndexingStatus { diff --git a/backend/crates/cross-checker/src/main.rs b/backend/crates/cross-checker/src/main.rs index 894a7c6..68483c5 100644 --- a/backend/crates/cross-checker/src/main.rs +++ b/backend/crates/cross-checker/src/main.rs @@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> { let indexing_statuses = query_indexing_statuses(indexers).await; info!("Monitor proofs of indexing"); - let pois = query_proofs_of_indexing(indexing_statuses).await; + let pois = query_proofs_of_indexing(indexing_statuses, config.block_choice_policy).await; let write_err = store.write_pois(&pois, store::PoiLiveness::Live).err(); if let Some(err) = write_err { diff --git a/backend/crates/cross-checker/src/tests/proofs_of_indexing.rs b/backend/crates/cross-checker/src/tests/proofs_of_indexing.rs index 62e1f27..61b30bf 100644 --- a/backend/crates/cross-checker/src/tests/proofs_of_indexing.rs +++ b/backend/crates/cross-checker/src/tests/proofs_of_indexing.rs @@ -1,3 +1,4 @@ +use graphix_common::block_choice::BlockChoicePolicy; use graphix_common::queries; use graphix_common::tests::{fast_rng, gen::gen_indexers}; use itertools::Itertools; @@ -12,7 +13,8 @@ async fn proofs_of_indexing() { let indexers = gen_indexers(&mut rng, max_indexers as usize); let indexing_statuses = queries::query_indexing_statuses(indexers).await; - let pois = queries::query_proofs_of_indexing(indexing_statuses); + let pois = + queries::query_proofs_of_indexing(indexing_statuses, BlockChoicePolicy::Earliest); let actual_pois = pois.await.into_iter().collect::>();