Skip to content

Commit

Permalink
feat: API endpoints to query historical pair data
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 committed Jan 21, 2025
1 parent 15e15af commit 0413635
Show file tree
Hide file tree
Showing 17 changed files with 782 additions and 20 deletions.
97 changes: 97 additions & 0 deletions boltzr/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions boltzr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ flate2 = "1.0.35"
pyroscope = { version = "0.5.8", optional = true }
pyroscope_pprofrs = { version = "0.2.8", optional = true }
csv = "1.3.1"
axum-extra = { version = "0.10.0", features = ["typed-header"] }
redis = { version = "0.28.1", features = ["tokio-comp", "r2d2"] }

[build-dependencies]
built = { version = "0.7.5", features = ["git2"] }
Expand Down
68 changes: 68 additions & 0 deletions boltzr/src/api/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use axum::body::Body;
use axum::extract::Request;
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use axum::Json;
use serde::Serialize;

#[derive(Serialize)]
pub struct ApiError {
pub error: String,
}

pub struct AxumError(anyhow::Error);

impl IntoResponse for AxumError {
fn into_response(self) -> Response {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError {
error: format!("{}", self.0),
}),
)
.into_response()
}
}

impl<E> From<E> for AxumError
where
E: Into<anyhow::Error>,
{
fn from(err: E) -> Self {
Self(err.into())
}
}

pub async fn error_middleware(request: Request<Body>, next: Next) -> Response<Body> {
let response = next.run(request).await;

if response.status().is_server_error() || response.status().is_client_error() {
if let Some(content_type) = response.headers().get(CONTENT_TYPE) {
// If the Content-Type is not JSON, make it JSON
if content_type != "application/json" {
let (parts, body) = response.into_parts();
let body_str = match axum::body::to_bytes(body, 8192).await {
Ok(bytes) => match std::str::from_utf8(&bytes) {
Ok(str) => str.to_string(),
Err(_) => return Response::from_parts(parts, Body::from(bytes)),
},
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError {
error: format!("could not handle body: {}", err),
}),
)
.into_response()
}
};

return (parts.status, Json(ApiError { error: body_str })).into_response();
}
}
}

response
}
35 changes: 35 additions & 0 deletions boltzr/src/api/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use axum_extra::headers::{Error, HeaderName, HeaderValue};

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Referral(String);

impl Referral {
pub fn inner(&self) -> &str {
&self.0
}
}

impl axum_extra::headers::Header for Referral {
fn name() -> &'static HeaderName {
static NAME: HeaderName = HeaderName::from_static("referral");
&NAME
}

fn decode<'i, I>(values: &mut I) -> Result<Self, Error>
where
Self: Sized,
I: Iterator<Item = &'i HeaderValue>,
{
let value = values.next().ok_or_else(Error::invalid)?;
value
.to_str()
.map_err(|_| Error::invalid())
.map(|value| Self(value.to_owned()))
}

fn encode<E: Extend<HeaderValue>>(&self, values: &mut E) {
values.extend(std::iter::once(
HeaderValue::from_str(&self.0).expect("invalid header value"),
));
}
}
33 changes: 28 additions & 5 deletions boltzr/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::api::errors::error_middleware;
use crate::api::sse::sse_handler;
use crate::api::stats::get_stats;
#[cfg(feature = "metrics")]
use crate::metrics::server::MetricsLayer;
use crate::service::Service;
use axum::routing::get;
use axum::{Extension, Router};
use serde::{Deserialize, Serialize};
Expand All @@ -9,10 +14,10 @@ use tracing::{debug, info};
use ws::status::SwapInfos;
use ws::types::SwapStatus;

#[cfg(feature = "metrics")]
use crate::metrics::server::MetricsLayer;

mod errors;
mod headers;
mod sse;
mod stats;
pub mod ws;

#[derive(Deserialize, Serialize, PartialEq, Clone, Debug)]
Expand All @@ -22,13 +27,18 @@ pub struct Config {
}

pub struct Server<S> {
swap_infos: S,
config: Config,
cancellation_token: CancellationToken,

service: Arc<Service>,

swap_infos: S,
swap_status_update_tx: tokio::sync::broadcast::Sender<Vec<SwapStatus>>,
}

struct ServerState<S> {
service: Arc<Service>,

swap_infos: S,
swap_status_update_tx: tokio::sync::broadcast::Sender<Vec<SwapStatus>>,
}
Expand All @@ -40,11 +50,13 @@ where
pub fn new(
config: Config,
cancellation_token: CancellationToken,
service: Arc<Service>,
swap_infos: S,
swap_status_update_tx: tokio::sync::broadcast::Sender<Vec<SwapStatus>>,
) -> Self {
Server {
config,
service,
swap_infos,
cancellation_token,
swap_status_update_tx,
Expand Down Expand Up @@ -79,6 +91,7 @@ where
axum::serve(
listener,
router.layer(Extension(Arc::new(ServerState {
service: self.service.clone(),
swap_infos: self.swap_infos.clone(),
swap_status_update_tx: self.swap_status_update_tx.clone(),
}))),
Expand All @@ -95,7 +108,13 @@ where
}

fn add_routes(router: Router) -> Router {
router.route("/streamswapstatus", get(sse_handler::<S>))
router
.layer(axum::middleware::from_fn(error_middleware))
.route("/streamswapstatus", get(sse_handler::<S>))
.route(
"/v2/swap/{swap_type}/stats/{from}/{to}",
get(get_stats::<S>),
)
}
}

Expand All @@ -104,8 +123,11 @@ mod test {
use crate::api::ws::status::SwapInfos;
use crate::api::ws::types::SwapStatus;
use crate::api::{Config, Server};
use crate::cache::Redis;
use crate::service::Service;
use async_trait::async_trait;
use reqwest::StatusCode;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Sender;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -150,6 +172,7 @@ mod test {
host: "127.0.0.1".to_string(),
},
cancel.clone(),
Arc::new(Service::new::<Redis>(None, None, None)),
Fetcher {
status_tx: status_tx.clone(),
},
Expand Down
Loading

0 comments on commit 0413635

Please sign in to comment.