Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] fcos-cincinnati: port to actix-web 4.1 #79

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,219 changes: 381 additions & 838 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ edition = "2018"
publish = false

[dependencies]
actix-cors = "^0.2"
actix-web = "^2.0.0"
actix-cors = "0.6.1"
actix-web = "4.1"
chrono = "^0.4.7"
failure = "^0.1.1"
maplit = "^1.0"
Expand Down
18 changes: 8 additions & 10 deletions commons/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
use actix_web::HttpResponse;

/// Serve metrics requests (Prometheus textual format).
pub async fn serve_metrics() -> Result<HttpResponse, failure::Error> {
pub async fn serve_metrics() -> HttpResponse {
use prometheus::Encoder;

let content = {
let metrics = prometheus::default_registry().gather();
let txt_enc = prometheus::TextEncoder::new();
let mut buf = vec![];
txt_enc.encode(&metrics, &mut buf)?;
buf
};

Ok(HttpResponse::Ok().body(content))
let metrics = prometheus::default_registry().gather();
let txt_enc = prometheus::TextEncoder::new();
let mut buf = vec![];
match txt_enc.encode(&metrics, &mut buf) {
Err(_) => HttpResponse::InternalServerError().finish(),
Ok(content) => HttpResponse::Ok().body(content),
}
}
8 changes: 4 additions & 4 deletions commons/src/web.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::graph::GraphScope;
use actix_cors::CorsFactory;
use actix_cors::Cors;
use failure::{bail, ensure, err_msg};
use std::collections::HashSet;

/// Build a CORS middleware.
///
/// By default, this allows all CORS requests from all origins.
/// If an allowlist is provided, only those origins are allowed instead.
pub fn build_cors_middleware(origin_allowlist: &Option<Vec<String>>) -> CorsFactory {
let mut builder = actix_cors::Cors::new();
pub fn build_cors_middleware(origin_allowlist: &Option<Vec<String>>) -> Cors {
let mut builder = Cors::default();
match origin_allowlist {
Some(allowed) => {
for origin in allowed {
Expand All @@ -19,7 +19,7 @@ pub fn build_cors_middleware(origin_allowlist: &Option<Vec<String>>) -> CorsFact
builder = builder.send_wildcard();
}
};
builder.finish()
builder
}

/// Validate input query parameters into a valid graph scope.
Expand Down
9 changes: 4 additions & 5 deletions fcos-graph-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ edition = "2018"
publish = false

[dependencies]
actix = "^0.9.0"
actix-web = "^2.0.0"
actix = "0.13"
actix-web = "4.1"
cbloom = "^0.1.3"
chrono = "^0.4.7"
commons = { path = "../commons" }
Expand All @@ -19,8 +19,7 @@ lazy_static = "^1.3.0"
log = "^0.4.3"
maplit = "^1.0"
prometheus = "0.13"
reqwest = { version = "^0.10.1", features = ["json"] }
serde = "^1.0.70"
serde_derive = "^1.0.70"
reqwest = { version = "^0.11.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "^1.0.22"
structopt = "^0.3.7"
22 changes: 18 additions & 4 deletions fcos-graph-builder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use actix::prelude::*;
use actix_web::{web, App, HttpResponse};
use commons::{graph, metrics};
use failure::{Fallible, ResultExt};
use futures::FutureExt;
use prometheus::{IntCounterVec, IntGauge, IntGaugeVec};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -67,7 +68,7 @@ fn main() -> Fallible<()> {
.try_init()
.context("failed to initialize logging")?;

let sys = actix::System::new("fcos_cincinnati_gb");
let sys = actix::System::new();

// Parse config file and validate settings.
let (service_settings, status_settings) = {
Expand All @@ -79,7 +80,8 @@ fn main() -> Fallible<()> {

let mut scrapers = HashMap::with_capacity(service_settings.scopes.len());
for scope in &service_settings.scopes {
let addr = scraper::Scraper::new(scope.clone())?.start();
let entry = scraper::Scraper::new(scope.clone())?;
let addr = sys.block_on(async { entry.start() });
scrapers.insert(scope.clone(), addr);
}

Expand All @@ -97,7 +99,7 @@ fn main() -> Fallible<()> {
let service_socket = service_settings.socket_addr();
debug!("main service address: {}", service_socket);
let gb_service = service_state.clone();
actix_web::HttpServer::new(move || {
let serv = actix_web::HttpServer::new(move || {
App::new()
.wrap(commons::web::build_cors_middleware(
&service_settings.origin_allowlist,
Expand All @@ -107,18 +109,20 @@ fn main() -> Fallible<()> {
})
.bind(service_socket)?
.run();
actix::System::current().arbiter().spawn(serv.map(|_| ()));

// Graph-builder status service.
let status_socket = status_settings.socket_addr();
debug!("status service address: {}", status_socket);
let gb_status = service_state;
actix_web::HttpServer::new(move || {
let serv2 = actix_web::HttpServer::new(move || {
App::new()
.data(gb_status.clone())
.route("/metrics", web::get().to(metrics::serve_metrics))
})
.bind(status_socket)?
.run();
actix::System::current().arbiter().spawn(serv2.map(|_| ()));

sys.run()?;
Ok(())
Expand All @@ -140,6 +144,16 @@ struct GraphQuery {
pub(crate) async fn gb_serve_graph(
data: web::Data<AppState>,
web::Query(query): web::Query<GraphQuery>,
) -> HttpResponse {
match gb_serve_graph_inner(data, query).await {
Err(_) => HttpResponse::InternalServerError().finish(),
Ok(resp) => resp,
}
}

pub(crate) async fn gb_serve_graph_inner(
data: web::Data<AppState>,
query: GraphQuery,
) -> Result<HttpResponse, failure::Error> {
let scope = match commons::web::validate_scope(query.basearch, query.stream, &data.scope_filter)
{
Expand Down
9 changes: 5 additions & 4 deletions fcos-graph-builder/src/scraper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use actix_web::web::Bytes;
use commons::{graph, metadata};
use failure::{Error, Fallible};
use reqwest::Method;
use std::future::Future;
use std::num::NonZeroU64;
use std::time::Duration;

Expand Down Expand Up @@ -166,7 +167,7 @@ impl Handler<RefreshTick> for Scraper {
actix::fut::ok(())
});

Box::new(update_graph)
Box::pin(update_graph)
}
}

Expand All @@ -185,13 +186,13 @@ impl Handler<GetCachedGraph> for Scraper {
use failure::format_err;

if msg.scope.basearch != self.scope.basearch {
return Box::new(actix::fut::err(format_err!(
return Box::pin(actix::fut::err(format_err!(
"unexpected basearch '{}'",
msg.scope.basearch
)));
}
if msg.scope.stream != self.scope.stream {
return Box::new(actix::fut::err(format_err!(
return Box::pin(actix::fut::err(format_err!(
"unexpected stream '{}'",
msg.scope.stream
)));
Expand All @@ -201,7 +202,7 @@ impl Handler<GetCachedGraph> for Scraper {
.with_label_values(&[&self.scope.basearch, &self.scope.stream])
.inc();

Box::new(actix::fut::ok(self.graph.clone()))
Box::pin(actix::fut::ok(self.graph.clone()))
}
}

Expand Down
9 changes: 4 additions & 5 deletions fcos-policy-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ edition = "2018"
publish = false

[dependencies]
actix = "^0.9.0"
actix-web = "^2.0.0"
actix = "0.13"
actix-web = "4.1"
cbloom = "^0.1.3"
chrono = "^0.4.7"
commons = { path = "../commons" }
Expand All @@ -19,9 +19,8 @@ lazy_static = "^1.3.0"
log = "^0.4.3"
maplit = "^1.0"
prometheus = "0.13"
reqwest = { version = "^0.10.1", features = ["json"] }
serde = "^1.0.70"
serde_derive = "^1.0.70"
reqwest = { version = "^0.11.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "^1.0.22"
serde_qs = "0.9.2"
structopt = "^0.3.7"
19 changes: 16 additions & 3 deletions fcos-policy-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod utils;
use actix_web::{web, App, HttpResponse};
use commons::{graph, metrics, policy};
use failure::{Error, Fallible, ResultExt};
use futures::FutureExt;
use prometheus::{Histogram, IntCounter, IntGauge};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
Expand Down Expand Up @@ -67,7 +68,7 @@ fn main() -> Fallible<()> {
(settings.service, settings.status)
};

let sys = actix::System::new("fcos_cincinnati_pe");
let sys = actix::System::new();

let node_population = Arc::new(cbloom::Filter::new(
service_settings.bloom_size,
Expand All @@ -92,7 +93,7 @@ fn main() -> Fallible<()> {
// Policy-engine main service.
let service_socket = service_settings.socket_addr();
debug!("main service address: {}", service_socket);
actix_web::HttpServer::new(move || {
let serv = actix_web::HttpServer::new(move || {
App::new()
.wrap(commons::web::build_cors_middleware(
&service_settings.origin_allowlist,
Expand All @@ -102,16 +103,18 @@ fn main() -> Fallible<()> {
})
.bind(service_socket)?
.run();
actix::System::current().arbiter().spawn(serv.map(|_| ()));

// Policy-engine status service.
let status_socket = status_settings.socket_addr();
debug!("status service address: {}", status_socket);
actix_web::HttpServer::new(move || {
let serv2 = actix_web::HttpServer::new(move || {
App::new().route("/metrics", web::get().to(metrics::serve_metrics))
})
.bind(status_socket)?
.run();

actix::System::current().arbiter().spawn(serv2.map(|_| ()));
sys.run()?;
Ok(())
}
Expand All @@ -136,6 +139,16 @@ pub struct GraphQuery {
pub(crate) async fn pe_serve_graph(
data: web::Data<AppState>,
web::Query(query): web::Query<GraphQuery>,
) -> HttpResponse {
match pe_serve_graph_inner(data, query).await {
Err(_) => HttpResponse::InternalServerError().finish(),
Ok(resp) => resp,
}
}

pub(crate) async fn pe_serve_graph_inner(
data: web::Data<AppState>,
query: GraphQuery,
) -> Result<HttpResponse, Error> {
pe_record_metrics(&data, &query);

Expand Down