Skip to content

Commit

Permalink
clean up unused namespaces and channels
Browse files Browse the repository at this point in the history
  • Loading branch information
ckampfe committed Feb 6, 2025
1 parent e49905f commit 4b14d1e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 207 deletions.
253 changes: 48 additions & 205 deletions src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::{AppState, Done};
use axum::body::{Body, BodyDataStream};
use axum::extract::{Path, State};
use axum::extract::{Path, Request, State};
use axum::http::{header, HeaderMap, HeaderValue, StatusCode};
use axum::response::IntoResponse;
use axum::routing::{delete, get, post};
use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::Router;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{oneshot, Mutex};

type Namespace = String;
type ChannelName = String;
Expand All @@ -24,50 +25,58 @@ pub(crate) type ChannelClients = Mutex<
>,
>;

pub(crate) fn routes() -> Router<Arc<AppState>> {
pub(crate) fn routes(state: Arc<AppState>) -> Router<Arc<AppState>> {
Router::new()
.route("/channels/namespaces", get(list_all_namespaces))
.route("/channels/{namespace}", get(list_all_namespace_channels))
.route(
"/channels/{namespace}",
delete(delete_namespace_and_all_channels),
)
.route(
"/channels/{namespace}/{channel_name}",
get(subscribe_to_channel),
)
.route(
"/channels/{namespace}/{channel_name}",
post(broadcast_to_channel),
get(subscribe_to_channel).route_layer(middleware::from_fn_with_state(
state.clone(),
clean_up_unused_channels,
)),
)
.route(
"/channels/{namespace}/{channel_name}",
delete(delete_channel),
post(broadcast_to_channel).route_layer(middleware::from_fn_with_state(
state.clone(),
clean_up_unused_channels,
)),
)
}

async fn delete_namespace_and_all_channels(
Path(namespace): Path<String>,
async fn clean_up_unused_channels(
Path((namespace, channel_name)): Path<(String, String)>,
State(state): State<Arc<AppState>>,
) -> axum::response::Result<()> {
let mut channel_clients = state.channel_clients.lock().await;
request: Request,
next: Next,
) -> Response {
let (tx, rx) = oneshot::channel();

channel_clients.remove(&namespace);
tokio::spawn(async move {
let _ = rx.await;

Ok(())
}
let mut channel_clients = state.channel_clients.lock().await;

async fn delete_channel(
Path((namespace, channel_name)): Path<(String, String)>,
State(state): State<Arc<AppState>>,
) -> axum::response::Result<()> {
let mut channel_clients = state.channel_clients.lock().await;
let delete_namespace = if let Some(namespace_channels) = channel_clients.get_mut(&namespace)
{
namespace_channels.remove(&channel_name);

if let Some(channels) = channel_clients.get_mut(&namespace) {
channels.remove(&channel_name);
}
namespace_channels.is_empty()
} else {
false
};

Ok(())
if delete_namespace {
channel_clients.remove(&namespace);
}
});

let response = next.run(request).await;

let _ = tx.send(());

response
}

async fn list_all_namespaces(
Expand Down Expand Up @@ -105,11 +114,9 @@ async fn broadcast_to_channel(
) -> axum::response::Result<()> {
let mut channel_clients = state.channel_clients.lock().await;

let namespace_channels = if let Some(channels) = channel_clients.get_mut(&namespace) {
channels
} else {
channel_clients.insert(namespace.clone(), HashMap::new());
channel_clients.get_mut(&namespace).unwrap()
let namespace_channels = match channel_clients.entry(namespace) {
std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
std::collections::hash_map::Entry::Vacant(e) => e.insert(HashMap::new()),
};

let tx = if let Some((tx, _rx)) = namespace_channels.get(&channel_name) {
Expand All @@ -124,11 +131,11 @@ async fn broadcast_to_channel(

drop(channel_clients);

let body_stream = body.into_data_stream();
let request_body_stream = body.into_data_stream();

let (done, done_rx) = Done::new();

tx.send_async((body_stream, request_headers, done))
tx.send_async((request_body_stream, request_headers, done))
.await
.map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;

Expand All @@ -145,11 +152,9 @@ async fn subscribe_to_channel(
) -> axum::response::Result<impl IntoResponse> {
let mut channel_clients = state.channel_clients.lock().await;

let namespace_channels = if let Some(channels) = channel_clients.get_mut(&namespace) {
channels
} else {
channel_clients.insert(namespace.clone(), HashMap::new());
channel_clients.get_mut(&namespace).unwrap()
let namespace_channels = match channel_clients.entry(namespace) {
std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
std::collections::hash_map::Entry::Vacant(e) => e.insert(HashMap::new()),
};

let rx = if let Some((_tx, rx)) = namespace_channels.get(&channel_name) {
Expand Down Expand Up @@ -420,166 +425,4 @@ mod tests {

assert_eq!(ids, vec!["it_should_autovivify_on_publish"])
}

#[tokio::test]
async fn delete_channel() {
let options = Options::default();

let port = get_port();

let listener = tokio::net::TcpListener::bind(("0.0.0.0", port))
.await
.unwrap();

let (_done, done_rx) = Done::new();

tokio::spawn(async move {
axum::serve(listener, app(options))
.with_graceful_shutdown(async move { done_rx.await.unwrap() })
.await
.unwrap();
});

tokio::spawn(async move {
reqwest::Client::new()
.post(format!(
"http://localhost:{port}/channels/a_great_ns/it_should_autovivify_on_publish"
))
.body("some body")
.send()
.await
.unwrap()
});

reqwest::get(format!(
"http://localhost:{port}/channels/a_great_ns/it_should_autovivify_on_publish"
))
.await
.unwrap();

let namespaces: HashSet<String> =
reqwest::get(format!("http://localhost:{port}/channels/namespaces"))
.await
.unwrap()
.json()
.await
.unwrap();

assert_eq!(namespaces, HashSet::from(["a_great_ns".to_string()]));

let ids: Vec<String> = reqwest::get(format!("http://localhost:{port}/channels/a_great_ns"))
.await
.unwrap()
.json()
.await
.unwrap();

assert_eq!(ids, vec!["it_should_autovivify_on_publish"]);

reqwest::Client::new()
.delete(format!(
"http://localhost:{port}/channels/a_great_ns/it_should_autovivify_on_publish"
))
.send()
.await
.unwrap();

let ids: Vec<String> = reqwest::get(format!("http://localhost:{port}/channels/a_great_ns"))
.await
.unwrap()
.json()
.await
.unwrap();

assert_eq!(ids, Vec::<String>::new());

let namespaces: HashSet<String> =
reqwest::get(format!("http://localhost:{port}/channels/namespaces"))
.await
.unwrap()
.json()
.await
.unwrap();

assert_eq!(namespaces, HashSet::from(["a_great_ns".to_string()]));
}

#[tokio::test]
async fn delete_namespace_and_all_channels() {
let options = Options::default();

let port = get_port();

let listener = tokio::net::TcpListener::bind(("0.0.0.0", port))
.await
.unwrap();

let (_done, done_rx) = Done::new();

tokio::spawn(async move {
axum::serve(listener, app(options))
.with_graceful_shutdown(async move { done_rx.await.unwrap() })
.await
.unwrap();
});

tokio::spawn(async move {
reqwest::Client::new()
.post(format!(
"http://localhost:{port}/channels/a_great_ns/it_should_autovivify_on_publish"
))
.body("some body")
.send()
.await
.unwrap()
});

reqwest::get(format!(
"http://localhost:{port}/channels/a_great_ns/it_should_autovivify_on_publish"
))
.await
.unwrap();

let namespaces: HashSet<String> =
reqwest::get(format!("http://localhost:{port}/channels/namespaces"))
.await
.unwrap()
.json()
.await
.unwrap();

assert_eq!(namespaces, HashSet::from(["a_great_ns".to_string()]));

let ids: Vec<String> = reqwest::get(format!("http://localhost:{port}/channels/a_great_ns"))
.await
.unwrap()
.json()
.await
.unwrap();

assert_eq!(ids, vec!["it_should_autovivify_on_publish"]);

reqwest::Client::new()
.delete(format!("http://localhost:{port}/channels/a_great_ns"))
.send()
.await
.unwrap();

let ns_status = reqwest::get(format!("http://localhost:{port}/channels/a_great_ns"))
.await
.unwrap()
.status();

assert_eq!(ns_status, StatusCode::NOT_FOUND);

let namespaces: HashSet<String> =
reqwest::get(format!("http://localhost:{port}/channels/namespaces"))
.await
.unwrap()
.json()
.await
.unwrap();

assert_eq!(namespaces, HashSet::new());
}
}
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
// - [x] namespaces for channels, e.g., /channels/some_namespace/some_id
// - [ ] /c, /p shorthand endpoints for channels
// - [x] modules
// - [x] automatically delete channels when unused
// - [x] automatically delete namespaces when unused
// - [ ] reevalute API endpoints to be more RESTish
// - [ ] GET only API for browser stuff
// - [ ] add diagram to README to explain what httpipe is
// - [ ] rename to httq?
// - [ ] clean up topics/channels that have no use
// - [x] clean up topics/channels that have no use

use axum::extract::State;
use axum::response::IntoResponse;
Expand Down Expand Up @@ -96,7 +98,7 @@ fn app(options: Options) -> axum::Router {
};
let state = Arc::new(state);

let channels_routes = channel::routes();
let channels_routes = channel::routes(Arc::clone(&state));

let other_routes = Router::new().route("/state", get(app_state));

Expand Down

0 comments on commit 4b14d1e

Please sign in to comment.