Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection pool #9

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 316 additions & 2 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ rand = "0.8.5"
clap = { version = "4.5.16", features = ["derive"] }
uuid = { version = "1.10.0", features = ["v4", "fast-rng"] }
thiserror = "1.0.64"
axum = "0.7.7"
bytes = "1.8.0"
1 change: 1 addition & 0 deletions ekilibri.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ connection_timeout = 1000
write_timeout = 1000
read_timeout = 1000
health_check_path = "/health"
pool_size = 10
116 changes: 33 additions & 83 deletions src/bin/command.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::time::Duration;

use clap::Parser;
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
time,
use axum::{
body::Bytes,
http::{header, HeaderMap, StatusCode},
routing::{get, post},
Router,
};
use clap::Parser;
use tokio::{net::TcpListener, time};

use tracing::{debug, info, warn};

use uuid::Uuid;

use ekilibri::http::{parse_request, Method, ParsingError, CRLF};
use tracing::info;

#[derive(Debug, Parser)]
struct Args {
Expand All @@ -25,6 +23,12 @@ async fn main() {

let args = Args::parse();
let port = args.port;

let app = Router::new()
.route("/health", get(health))
.route("/sleep", get(sleep))
.route("/echo", post(echo));

let listener = match TcpListener::bind(format!("127.0.0.1:{port}")).await {
Ok(listener) => listener,
Err(e) => panic!(
Expand All @@ -33,84 +37,30 @@ async fn main() {
),
};

loop {
accept_and_handle_connection(&listener).await;
}
axum::serve(listener, app).await.unwrap();
}

async fn accept_and_handle_connection(listener: &TcpListener) {
match listener.accept().await {
Ok((stream, _)) => {
tokio::spawn(async move {
process_request(stream).await;
});
}
Err(_) => eprintln!("Error listening to socket"),
}
async fn health() -> StatusCode {
info!("Received request for /health");
StatusCode::OK
}

async fn process_request(mut stream: TcpStream) {
let request_id = Uuid::new_v4();
let request = match parse_request(&request_id, &mut stream).await {
Ok((request, _)) => request,
Err(e) => {
let status = match e {
ParsingError::MissingContentLength => "411",
ParsingError::HTTPVersionNotSupported => "505",
_ => "400",
};
let response = format!("HTTP/1.1 {status}{CRLF}{CRLF}");
if let Err(e) = stream.write_all(response.as_bytes()).await {
debug!("Unable to send response to the client {e}");
}
return;
}
};
async fn sleep() -> StatusCode {
info!("Received request for /sleep");
time::sleep(Duration::from_millis(2000)).await;
StatusCode::OK
}

let response = match request.method {
Method::Get => match request.path.as_str() {
"/sleep" => {
info!("Received request for /sleep, request_id={request_id}");
time::sleep(Duration::from_millis(2000)).await;
format!("HTTP/1.1 200{CRLF}{CRLF}")
}
"/health" => {
info!("Received request for /health, request_id={request_id}");
format!("HTTP/1.1 200{CRLF}{CRLF}")
}
_ => {
info!("Received request for unmapped path, request_id={request_id}");
format!("HTTP/1.1 404{CRLF}{CRLF}")
}
},
Method::Post => match request.path.as_str() {
"/echo" => {
info!("Received request for /echo, request_id={request_id}");
let length = match request.headers.get("content-length") {
Some(value) => value,
None => "0",
};
let content_length = format!("Content-Length: {length}");
let content_type = match request.headers.get("content-type") {
Some(value) => value,
None => "text/plain",
};
let content_type = format!("Content-Type: {content_type}");
let body = request.body.unwrap_or_default();
format!("HTTP/1.1 200{CRLF}{content_length}{CRLF}{content_type}{CRLF}{CRLF}{body}")
}
_ => {
info!("Received request for unmapped path, request_id={request_id}");
format!("HTTP/1.1 404{CRLF}{CRLF}")
}
},
Method::Unknown => {
warn!("Received request for unmapped path, request_id={request_id}");
format!("HTTP/1.1 404{CRLF}{CRLF}")
}
async fn echo(headers: HeaderMap, body: Bytes) -> Result<(HeaderMap, String), StatusCode> {
info!("Received request for /echo");
let content_type = match headers.get("content-type") {
Some(value) => value.to_str().unwrap(),
None => "text/plain",
};

if let Err(e) = stream.write_all(response.as_bytes()).await {
debug!("Unable to send response to the client {e}");
let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap());
match String::from_utf8(body.to_vec()) {
Ok(body) => Ok((headers, body)),
Err(_) => Err(StatusCode::BAD_REQUEST),
}
}
Loading
Loading