Skip to content

Commit

Permalink
added base k8s deployment
Browse files Browse the repository at this point in the history
added pipeline
added brew.sh script
added concurrency to all pipelines
  • Loading branch information
rotarur committed Jul 27, 2023
2 parents 1c69ca3 + 26efba6 commit c046fa3
Show file tree
Hide file tree
Showing 17 changed files with 138 additions and 61 deletions.
53 changes: 25 additions & 28 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ on:
env:
CARGO_TERM_COLOR: always
RUST_BACKTRACE: full
REGISTRY: ghcr.io

jobs:
rustfmt:
Expand All @@ -31,48 +32,44 @@ jobs:
toolchain: ${{ matrix.rust }}
components: rustfmt
override: true
- name: Check formating
- name: Check formatting
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check

docker-api-server:
runs-on: ubuntu-latest/Users/rotarur/projects/edgeandnode/graph-infra/.github/workflows/template-deploy.yaml
docker-build-and-push:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
if: github.ref == 'refs/heads/main'
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
#- name: Login to Docker Hub
# uses: docker/login-action@v2
# with:
# username: ${{ secrets.DOCKERHUB_USERNAME }}
# password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: docker build
- name: Log in to the Container registry
uses: docker/login-action@v2
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push API server Docker image
uses: docker/build-push-action@v4
with:
push: false
tags: edgeandnode/graphix-api-server:latest
context: .
file: ops/api-server.dockerfile
docker-cross-checker:
runs-on: ubuntu-latest
steps:
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
#- name: Login to Docker Hub
# uses: docker/login-action@v2
# with:
# username: ${{ secrets.DOCKERHUB_USERNAME }}
# password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: docker build
push: true
tags: ghcr.io/${{ github.repository_owner }}/graphix-api-server:latest
- name: Build and push cross-checker service Docker image
uses: docker/build-push-action@v4
with:
push: false
tags: edgeandnode/graphix-cross-checker:latest
file: ops/cross-checker.dockerfile
context: .
file: ops/api-server.dockerfile
push: true
tags: ghcr.io/${{ github.repository_owner }}/graphix-cross-checker:latest

build:
name: Build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/template-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Diff ${{ inputs.ENVIRONMENT }}
if: inputs.ENABLE_DIFF
working-directory: k8s
run: kubectl diff -k ${{ inputs.ENVIRONMENT }}
run: kubectl diff -k ${{ inputs.ENVIRONMENT }} | true

- name: Deploy to ${{ inputs.ENVIRONMENT }} cluster
if: inputs.ENABLE_APPLY
Expand Down
6 changes: 2 additions & 4 deletions backend/crates/api-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_graphql::{
use async_graphql_warp::{self, GraphQLResponse};
use clap::Parser;
use graphix_common::{api_types as schema, store::Store};
use std::{convert::Infallible, net::Ipv4Addr};
use std::convert::Infallible;
use warp::{
http::{self, Method},
Filter,
Expand Down Expand Up @@ -59,9 +59,7 @@ async fn run_api_server(options: CliOptions, store: Store) {
.or(graphql_playground_route)
.or(graphql_route);

warp::serve(routes)
.run((Ipv4Addr::LOCALHOST, options.port))
.await;
warp::serve(routes).run(([0, 0, 0, 0], options.port)).await;
}

fn init_tracing() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ query IndexingStatuses {
number
hash
}
earliestBlock {
number
}
}
}
}
Expand Down
61 changes: 61 additions & 0 deletions backend/crates/common/src/block_choice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::prelude::IndexingStatus;
use serde::Deserialize;

#[derive(Copy, Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum BlockChoicePolicy {
// Use the earliest block that all indexers have in common
Earliest,
// Use the block that maximizes the total number of blocks synced across all indexers
MaxSyncedBlocks,
}

impl Default for BlockChoicePolicy {
fn default() -> Self {
BlockChoicePolicy::MaxSyncedBlocks
}
}

impl BlockChoicePolicy {
pub fn choose_block<'a>(
&self,
statuses: impl Iterator<Item = &'a IndexingStatus>,
) -> Option<u64> {
match self {
BlockChoicePolicy::Earliest => statuses
.map(|status| &status.latest_block.number)
.min()
.copied(),
BlockChoicePolicy::MaxSyncedBlocks => {
// Assuming that all statuses have the same `deployment` and `earliest_block_num`,
// this will return the block number that maximizes the total number of blocks
// synced across all indexers.

let mut indexers_ascending: Vec<&'a IndexingStatus> = statuses.collect();
indexers_ascending.sort_by_key(|status| status.latest_block.number);

let mut max_utility = 0;
let mut best_block: Option<u64> = None;

for (i, status) in indexers_ascending.iter().enumerate() {
let remaining_statuses = indexers_ascending.len() - i;
let block_number = status.latest_block.number;
if block_number < status.earliest_block_num {
// This status is inconsistent, ignore it, avoiding overflow.
continue;
}

let utility =
remaining_statuses as u64 * (block_number - status.earliest_block_num);

if utility > max_utility {
max_utility = utility;
best_block = Some(block_number);
}
}

best_block
}
}
}
}
4 changes: 4 additions & 0 deletions backend/crates/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{fs::File, path::Path, sync::Arc};
use tracing::info;

use crate::{
block_choice::BlockChoicePolicy,
indexer::{Indexer, IndexerInterceptor, RealIndexer},
network_subgraph::NetworkSubgraph,
};
Expand All @@ -13,6 +14,9 @@ use crate::{
pub struct Config {
pub database_url: String,
pub sources: Vec<ConfigSource>,
#[serde(default)]
pub block_choice_policy: BlockChoicePolicy,

#[serde(default = "Config::default_polling_period_in_seconds")]
pub polling_period_in_seconds: u64,
}
Expand Down
1 change: 1 addition & 0 deletions backend/crates/common/src/indexer/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl Indexer for IndexerInterceptor {
deployment: status.deployment,
network: status.network,
latest_block: status.latest_block,
earliest_block_num: status.earliest_block_num,
})
.collect();
Ok(hijacked_statuses)
Expand Down
24 changes: 15 additions & 9 deletions backend/crates/common/src/indexer/real_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,13 @@ impl Indexer for RealIndexer {
) -> Vec<ProofOfIndexing> {
let mut pois = vec![];

// Graph Node implements a limit of 10 POI requests per request, so
// split our requests up accordingly.
for requests in requests.chunks(10) {
// Graph Node implements a limit of 10 POI requests per request, so split our requests up
// accordingly.
//
// FIXME: This is temporarily set to 1 until we fix the error: 'Null value resolved for
// non-null field `proofOfIndexing`' Which is probably a Graph Node bug. Setting it to 1
// reduces the impact of this issue.
for requests in requests.chunks(1) {
trace!(
indexer = %self.id(),
batch_size = requests.len(),
Expand Down Expand Up @@ -302,18 +306,19 @@ mod gql_types {
.get(0)
.ok_or_else(|| anyhow!("chain status missing"))?;

let latest_block = match chain.on {
let (latest_block, earliest_block_num) = match &chain.on {
indexing_statuses::IndexingStatusesIndexingStatusesChainsOn::EthereumIndexingStatus(
indexing_statuses::IndexingStatusesIndexingStatusesChainsOnEthereumIndexingStatus {
ref latest_block,
latest_block,
earliest_block,
..
},
) => match latest_block {
Some(block) => BlockPointer {
) => match (latest_block, earliest_block) {
(Some(block), Some(earliest_block)) => (BlockPointer {
number: block.number.parse()?,
hash: Some(block.hash.clone().as_str().try_into()?),
},
None => {
}, earliest_block.number.parse()?),
_ => {
return Err(anyhow!("deployment has not started indexing yet"));
}
},
Expand All @@ -324,6 +329,7 @@ mod gql_types {
deployment: SubgraphDeployment(self.inner.subgraph),
network: chain.network.clone(),
latest_block,
earliest_block_num,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions backend/crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod api_types;
pub mod block_choice;
pub mod config;
mod indexer;
pub mod network_subgraph;
Expand Down
1 change: 0 additions & 1 deletion backend/crates/common/src/network_subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ impl NetworkSubgraph {
&self,
address: &[u8],
) -> anyhow::Result<Arc<dyn IndexerTrait>> {
println!("address is {:?}", hex::encode(address));
let request = GraphqlRequest {
query: Self::INDEXER_BY_ADDRESS_QUERY.to_string(),
variables: BTreeMap::from_iter(vec![(
Expand Down
29 changes: 14 additions & 15 deletions backend/crates/common/src/queries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::block_choice::BlockChoicePolicy;
use crate::indexer::Indexer;
use crate::prelude::{
BlockPointer, IndexingStatus, PoiRequest, ProofOfIndexing, SubgraphDeployment,
};
use crate::prelude::{IndexingStatus, PoiRequest, ProofOfIndexing, SubgraphDeployment};
use crate::prometheus_metrics::metrics;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand Down Expand Up @@ -74,6 +73,7 @@ pub async fn query_indexing_statuses(indexers: Vec<Arc<dyn Indexer>>) -> Vec<Ind

pub async fn query_proofs_of_indexing(
indexing_statuses: Vec<IndexingStatus>,
block_choice_policy: BlockChoicePolicy,
) -> Vec<ProofOfIndexing> {
info!("Query POIs for recent common blocks across indexers");

Expand Down Expand Up @@ -102,17 +102,13 @@ pub async fn query_proofs_of_indexing(
)
}));

// For each deployment, identify the latest block number that all indexers have in common
let latest_blocks: HashMap<SubgraphDeployment, Option<BlockPointer>> =
// For each deployment, chooose a block on which to query the PoI
let latest_blocks: HashMap<SubgraphDeployment, Option<u64>> =
HashMap::from_iter(deployments.iter().map(|deployment| {
(
deployment.clone(),
statuses_by_deployment.get(deployment).and_then(|statuses| {
statuses
.iter()
.map(|status| &status.latest_block)
.min_by_key(|block| block.number)
.cloned()
block_choice_policy.choose_block(statuses.iter().map(|&s| s))
}),
)
}));
Expand All @@ -123,17 +119,20 @@ pub async fn query_proofs_of_indexing(
.map(|indexer| async {
let poi_requests = latest_blocks
.iter()
.filter(|(deployment, _)| {
.filter(|(deployment, &block_number)| {
statuses_by_deployment
.get(*deployment)
.expect("bug in matching deployments to latest blocks and indexers")
.iter()
.any(|status| status.indexer.eq(indexer))
.any(|status| {
status.indexer.eq(indexer)
&& Some(status.latest_block.number) >= block_number
})
})
.filter_map(|(deployment, block)| {
block.clone().map(|block| PoiRequest {
.filter_map(|(deployment, block_number)| {
block_number.map(|block_number| PoiRequest {
deployment: deployment.clone(),
block_number: block.number,
block_number: block_number,
})
})
.collect::<Vec<_>>();
Expand Down
4 changes: 3 additions & 1 deletion backend/crates/common/src/store/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::block_choice::BlockChoicePolicy;
use crate::tests::{fast_rng, gen::gen_indexers};
use crate::{
queries,
Expand All @@ -16,7 +17,8 @@ async fn poi_db_roundtrip() {
let indexers = gen_indexers(&mut rng, 100);

let indexing_statuses = queries::query_indexing_statuses(indexers).await;
let pois = queries::query_proofs_of_indexing(indexing_statuses).await;
let pois =
queries::query_proofs_of_indexing(indexing_statuses, BlockChoicePolicy::Earliest).await;

let store = Store::new(&test_db_url()).await.unwrap();
let mut conn = store.test_conn();
Expand Down
1 change: 1 addition & 0 deletions backend/crates/common/src/tests/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ where
network: "mainnet".into(),
latest_block: blocks.iter().choose(&mut rng).unwrap().clone(),
canonical_pois: gen_pois(blocks.clone(), &mut rng),
earliest_block_num: blocks[0].number,
})
.collect();

Expand Down
2 changes: 2 additions & 0 deletions backend/crates/common/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct DeploymentDetails {
pub network: String,
pub latest_block: BlockPointer,
pub canonical_pois: Vec<PartialProofOfIndexing>,
pub earliest_block_num: u64,
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down Expand Up @@ -45,6 +46,7 @@ impl Indexer for MockIndexer {
deployment: details.deployment,
network: details.network,
latest_block: details.latest_block,
earliest_block_num: details.earliest_block_num,
})
.collect())
}
Expand Down
1 change: 1 addition & 0 deletions backend/crates/common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct IndexingStatus {
pub deployment: SubgraphDeployment,
pub network: String,
pub latest_block: BlockPointer,
pub earliest_block_num: u64,
}

impl PartialEq for IndexingStatus {
Expand Down
2 changes: 1 addition & 1 deletion backend/crates/cross-checker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> {
let indexing_statuses = query_indexing_statuses(indexers).await;

info!("Monitor proofs of indexing");
let pois = query_proofs_of_indexing(indexing_statuses).await;
let pois = query_proofs_of_indexing(indexing_statuses, config.block_choice_policy).await;

let write_err = store.write_pois(&pois, store::PoiLiveness::Live).err();
if let Some(err) = write_err {
Expand Down
Loading

0 comments on commit c046fa3

Please sign in to comment.