Skip to content

Commit

Permalink
fix: shutdown and entry payload (#1)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Oct 7, 2024
1 parent b909d99 commit e892d6b
Show file tree
Hide file tree
Showing 16 changed files with 167 additions and 111 deletions.
42 changes: 28 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ version = "0.1.0"

[workspace.dependencies]
backon = { version = "1.0", features = ["tokio-sleep"] }
base64 = { version = "0.22" }
better-panic = { version = "0.3" }
build-data = { version = "0.2" }
byteorder = { version = "1.5" }
clap = { version = "4.5", features = ["derive"] }
const_format = { version = "0.2" }
ctrlc = "3.4"
error-stack = { version = "0.5" }
fastrace = { version = "0.7", features = ["enable"] }
flexbuffers = { version = "2.0" }
futures = "0.3"
gix-discover = "0.35"
insta = { version = "1.38", features = ["json"] }
Expand Down
2 changes: 2 additions & 0 deletions api/protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

extern crate core;

pub mod config;
pub mod property;
pub mod rpc;
36 changes: 21 additions & 15 deletions api/protos/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Range;

use serde::Deserialize;
use serde::Serialize;

use crate::property::TopicProps;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Entry {
/// Index of this log entry, assigned by the broker when the message is published. Guaranteed
/// to be unique within the topic. It must not be populated by the publisher in a write
/// call.
#[serde(skip_serializing_if = "Option::is_none")]
pub index: Option<i64>,
/// A padded, base64-encoded string of bytes, encoded with a URL and filename safe alphabet
/// (sometimes referred to as "web-safe" or "base64url"). Defined by [RFC4648].
///
/// [RFC4648]: https://datatracker.ietf.org/doc/html/rfc4648
pub data: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateLogRequest {
pub name: String,
Expand All @@ -31,14 +47,13 @@ pub struct CreateLogResponse {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppendLogRequest {
pub name: String,
pub data: Vec<u8>,
pub entry_cnt: i32,
pub entries: Vec<Entry>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppendLogResponse {
pub start_offset: i64,
pub end_offset: i64,
/// The half-open offset range of the appended entry.
pub offsets: Range<i64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -47,18 +62,9 @@ pub struct ReadLogRequest {
pub offset: i64,
}

#[derive(Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReadLogResponse {
pub data: Vec<u8>,
}

// TODO(tisonkun): adopts bytes debug fmt or find other robust ways
impl std::fmt::Debug for ReadLogResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReadLogResponse")
.field("data", &String::from_utf8_lossy(&self.data))
.finish()
}
pub entries: Vec<Entry>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
Expand Down
16 changes: 11 additions & 5 deletions cmd/morax/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,25 @@ impl CommandStart {
};

morax_runtime::init(&config.runtime);
ctrlc::set_handler(move || {
morax_runtime::shutdown();
})
.change_context_lazy(|| Error("failed to setup ctrl-c signal handle".to_string()))?;
morax_telemetry::init(&config.telemetry);

error_stack::Report::set_color_mode(error_stack::fmt::ColorMode::None);
error_stack::Report::set_charset(error_stack::fmt::Charset::Ascii);

let rt = morax_runtime::make_runtime("morax-main", "morax-main", 1);
rt.block_on(async move {
morax_telemetry::init(&config.telemetry);
let state = morax_server::start(config.server)
.await
.change_context_lazy(|| {
Error("A fatal error has occurred in Morax server process.".to_string())
})?;

let shutdown_handle = state.shutdown_handle();
ctrlc::set_handler(move || {
shutdown_handle();
})
.change_context_lazy(|| Error("failed to setup ctrl-c signal handle".to_string()))?;

state.await_shutdown().await;
Ok(())
})
Expand Down
8 changes: 0 additions & 8 deletions crates/kafka-broker/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ pub async fn start_broker(
log::info!("Morax Server is closing");
return Ok(());
}
_ = morax_runtime::wait_shutdown() => {
log::info!("Morax Server is shutting down");
return Ok(());
}
socket = broker_listener.accept() => socket,
};

Expand Down Expand Up @@ -100,10 +96,6 @@ async fn process_packet(
log::info!("Morax Server is closing");
return Ok(());
}
_ = morax_runtime::wait_shutdown() => {
log::info!("Morax Server is shutting down");
return Ok(());
}
closed = process_packet_one(&mut socket, &remote_addr, &broker) => {
if closed? {
return Ok(());
Expand Down
3 changes: 2 additions & 1 deletion crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ version.workspace = true
test = []

[dependencies]
better-panic = { workspace = true }
error-stack = { workspace = true }
fastrace = { workspace = true }
futures = { workspace = true }
Expand All @@ -38,7 +39,7 @@ tokio = { workspace = true }
tokio-util = { workspace = true }

[dev-dependencies]
pollster = { version = "0.3", features = ['macro'] }
pollster = { version = "0.3" }

[lints]
workspace = true
35 changes: 13 additions & 22 deletions crates/runtime/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::OnceLock;

use latches::task::Latch;
use serde::Deserialize;
use serde::Serialize;

Expand Down Expand Up @@ -45,7 +43,6 @@ struct GlobalRuntimes {
meta_runtime: Runtime,
data_runtime: Runtime,
bg_runtime: Runtime,
shutdown: Arc<Latch>,
}

static GLOBAL_RUNTIMES: OnceLock<GlobalRuntimes> = OnceLock::new();
Expand Down Expand Up @@ -84,23 +81,31 @@ pub fn init(opts: &RuntimeOptions) {
GLOBAL_RUNTIMES.get_or_init(|| do_initialize_runtimes(opts));
}

fn setup_panic_hook() {
std::panic::set_hook(Box::new(move |info| {
let backtrace = std::backtrace::Backtrace::capture();
log::error!("panic occurred: {info}\nbacktrace:\n{backtrace}");
better_panic::Settings::auto().create_panic_handler()(info);
shutdown();
}));
}

fn do_initialize_runtimes(opts: &RuntimeOptions) -> GlobalRuntimes {
log::info!("initializing global runtimes: {opts:?}");

setup_panic_hook();

let api_runtime = make_runtime("api_runtime", "api_thread", opts.api_runtime_threads);
let exec_runtime = make_runtime("exec_runtime", "exec_thread", opts.exec_runtime_threads);
let meta_runtime = make_runtime("meta_runtime", "meta_thread", opts.meta_runtime_threads);
let data_runtime = make_runtime("data_runtime", "data_thread", opts.data_runtime_threads);
let bg_runtime = make_runtime("bg_runtime", "bg_thread", opts.bg_runtime_threads);
let shutdown = Arc::new(Latch::new(1));

GlobalRuntimes {
api_runtime,
exec_runtime,
meta_runtime,
data_runtime,
bg_runtime,
shutdown,
}
}

Expand Down Expand Up @@ -129,20 +134,12 @@ pub fn bg_runtime() -> &'static Runtime {
}

pub fn shutdown() {
let runtimes = fetch_runtimes_or_default();
runtimes.shutdown.count_down();
}

pub async fn wait_shutdown() {
fetch_runtimes_or_default().shutdown.wait().await;
log::info!("The Global Runtimes are shutting down.");
std::process::exit(1);
}

#[cfg(test)]
mod tests {
use core::panic;

use pollster::FutureExt;

use super::*;

#[test]
Expand Down Expand Up @@ -180,10 +177,4 @@ mod tests {
assert_eq!(out, "hello")
}
}

#[test]
fn test_task_panic() {
let _fut = exec_runtime().spawn(async { panic!("test panic") });
wait_shutdown().block_on();
}
}
Loading

0 comments on commit e892d6b

Please sign in to comment.