Skip to content

Commit

Permalink
Merge pull request #6 from lucab/ups/multi-stream
Browse files Browse the repository at this point in the history
scraper: support scraping multiple streams
  • Loading branch information
Luca Bruno authored Jan 13, 2020
2 parents b14b2f5 + 9c10747 commit b69bc52
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 42 deletions.
57 changes: 43 additions & 14 deletions dumnati/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use actix::prelude::*;
use actix_web::{http::Method, middleware::Logger, server, App};
use actix_web::{HttpRequest, HttpResponse};
use failure::{Error, Fallible};
use futures::future;
use futures::prelude::*;
use prometheus::{Histogram, IntCounter};
use prometheus::{Histogram, IntCounter, IntCounterVec, IntGaugeVec};
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
Expand All @@ -48,6 +49,26 @@ lazy_static::lazy_static! {
prometheus::linear_buckets(0.0, 0.1, 11).unwrap()
)
.unwrap();
static ref GRAPH_FINAL_EDGES: IntGaugeVec = register_int_gauge_vec!(
"dumnati_gb_scraper_graph_final_edges",
"Number of edges in the cached graph, after processing",
&["stream"]
).unwrap();
static ref GRAPH_FINAL_RELEASES: IntGaugeVec = register_int_gauge_vec!(
"dumnati_gb_scraper_graph_final_releases",
"Number of releases in the cached graph, after processing",
&["stream"]
).unwrap();
static ref LAST_REFRESH: IntGaugeVec = register_int_gauge_vec!(
"dumnati_gb_scraper_graph_last_refresh_timestamp",
"UTC timestamp of last graph refresh",
&["stream"]
).unwrap();
static ref UPSTREAM_SCRAPES: IntCounterVec = register_int_counter_vec!(
"dumnati_gb_scraper_upstream_scrapes_total",
"Total number of upstream scrapes",
&["stream"]
).unwrap();
}

fn main() -> Fallible<()> {
Expand All @@ -58,11 +79,17 @@ fn main() -> Fallible<()> {

let sys = actix::System::new("dumnati");

let scraper_addr = scraper::Scraper::new("testing")?.start();
// TODO(lucab): figure out all configuration params.
let streams_cfg = maplit::btreeset!["testing", "stable"];
let mut scrapers = HashMap::with_capacity(streams_cfg.len());
for stream in streams_cfg {
let addr = scraper::Scraper::new(stream)?.start();
scrapers.insert(stream.to_string(), addr);
}

let node_population = Arc::new(cbloom::Filter::new(10 * 1024 * 1024, 1_000_000));
let service_state = AppState {
scraper_addr,
scrapers,
population: Arc::clone(&node_population),
};
let gb_service = service_state.clone();
Expand Down Expand Up @@ -112,7 +139,7 @@ fn main() -> Fallible<()> {

#[derive(Clone, Debug)]
pub(crate) struct AppState {
scraper_addr: Addr<scraper::Scraper>,
scrapers: HashMap<String, Addr<scraper::Scraper>>,
population: Arc<cbloom::Filter>,
}

Expand All @@ -130,11 +157,12 @@ pub(crate) fn gb_serve_graph(
.map(String::from)
.unwrap_or_default();

let cached_graph = req
.state()
.scraper_addr
.send(scraper::GetCachedGraph { stream })
.flatten();
let addr = match req.state().scrapers.get(&stream) {
None => return Box::new(future::ok(HttpResponse::NotFound().finish())),
Some(addr) => addr,
};

let cached_graph = addr.send(scraper::GetCachedGraph { stream }).flatten();

let resp = cached_graph
.and_then(|graph| policy::pick_basearch(graph, basearch))
Expand Down Expand Up @@ -167,14 +195,15 @@ pub(crate) fn pe_serve_graph(
.map(String::from)
.unwrap_or_default();

let addr = match req.state().scrapers.get(&stream) {
None => return Box::new(future::ok(HttpResponse::NotFound().finish())),
Some(addr) => addr,
};

let wariness = compute_wariness(&req.query());
ROLLOUT_WARINESS.observe(wariness);

let cached_graph = req
.state()
.scraper_addr
.send(scraper::GetCachedGraph { stream })
.flatten();
let cached_graph = addr.send(scraper::GetCachedGraph { stream }).flatten();

let resp = cached_graph
.and_then(|graph| policy::pick_basearch(graph, basearch))
Expand Down
46 changes: 18 additions & 28 deletions dumnati/src/scraper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,14 @@ use actix::prelude::*;
use failure::{Error, Fallible};
use futures::future;
use futures::prelude::*;
use prometheus::{IntCounter, IntGauge};
use reqwest::Method;

lazy_static::lazy_static! {
static ref GRAPH_FINAL_EDGES: IntGauge = register_int_gauge!(opts!(
"dumnati_gb_scraper_graph_final_edges",
"Number of edges in the cached graph, after processing"
)).unwrap();
static ref GRAPH_FINAL_RELEASES: IntGauge = register_int_gauge!(opts!(
"dumnati_gb_scraper_graph_final_releases",
"Number of releases in the cached graph, after processing"
)).unwrap();
static ref LAST_REFRESH: IntGauge = register_int_gauge!(opts!(
"dumnati_gb_scraper_graph_last_refresh_timestamp",
"UTC timestamp of last graph refresh"
)).unwrap();
static ref UPSTREAM_SCRAPES: IntCounter = register_int_counter!(opts!(
"dumnati_gb_scraper_upstream_scrapes_total",
"Total number of upstream scrapes"
))
.unwrap();
}

/// Release scraper.
#[derive(Clone, Debug)]
pub struct Scraper {
graph: graph::Graph,
hclient: reqwest::r#async::Client,
stream: String,
stream_metadata_url: reqwest::Url,
release_index_url: reqwest::Url,
}
Expand All @@ -40,12 +20,14 @@ impl Scraper {
where
S: Into<String>,
{
let vars = hashmap! { "stream".to_string() => stream.into() };
let stream = stream.into();
let vars = hashmap! { "stream".to_string() => stream.clone() };
let releases_json = envsubst::substitute(metadata::RELEASES_JSON, &vars)?;
let stream_json = envsubst::substitute(metadata::STREAM_JSON, &vars)?;
let scraper = Self {
graph: graph::Graph::default(),
hclient: reqwest::r#async::ClientBuilder::new().build()?,
stream,
release_index_url: reqwest::Url::parse(&releases_json)?,
stream_metadata_url: reqwest::Url::parse(&stream_json)?,
};
Expand Down Expand Up @@ -114,7 +96,9 @@ impl Handler<RefreshTick> for Scraper {
type Result = ResponseActFuture<Self, (), Error>;

fn handle(&mut self, _msg: RefreshTick, _ctx: &mut Self::Context) -> Self::Result {
UPSTREAM_SCRAPES.inc();
crate::UPSTREAM_SCRAPES
.with_label_values(&[&self.stream])
.inc();

let updates = self.assemble_graph();

Expand All @@ -123,9 +107,15 @@ impl Handler<RefreshTick> for Scraper {
.map(|graph, actor, _ctx| {
actor.graph = graph;
let refresh_timestamp = chrono::Utc::now();
LAST_REFRESH.set(refresh_timestamp.timestamp());
GRAPH_FINAL_EDGES.set(actor.graph.edges.len() as i64);
GRAPH_FINAL_RELEASES.set(actor.graph.nodes.len() as i64);
crate::LAST_REFRESH
.with_label_values(&[&actor.stream])
.set(refresh_timestamp.timestamp());
crate::GRAPH_FINAL_EDGES
.with_label_values(&[&actor.stream])
.set(actor.graph.edges.len() as i64);
crate::GRAPH_FINAL_RELEASES
.with_label_values(&[&actor.stream])
.set(actor.graph.nodes.len() as i64);
})
.then(|_r, _actor, ctx| {
Self::tick_later(ctx, std::time::Duration::from_secs(30));
Expand Down Expand Up @@ -156,7 +146,7 @@ impl Handler<GetCachedGraph> for Scraper {
type Result = ResponseActFuture<Self, graph::Graph, Error>;
fn handle(&mut self, msg: GetCachedGraph, _ctx: &mut Self::Context) -> Self::Result {
use failure::format_err;
if msg.stream != "testing" {
if msg.stream != self.stream {
return Box::new(actix::fut::err(format_err!(
"unexpected stream '{}'",
msg.stream
Expand All @@ -167,7 +157,7 @@ impl Handler<GetCachedGraph> for Scraper {
}

impl Scraper {
/// Schedule an immediate refresh the state machine.
/// Schedule an immediate refresh of the state machine.
pub fn tick_now(ctx: &mut Context<Self>) {
ctx.notify(RefreshTick {})
}
Expand Down

0 comments on commit b69bc52

Please sign in to comment.