diff --git a/Cargo.lock b/Cargo.lock index f8d8697..75ff07d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -229,6 +229,16 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "better-panic" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa9e1d11a268684cbd90ed36370d7577afb6c62d912ddff5c15fc34343e5036" +dependencies = [ + "backtrace", + "console", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -898,6 +908,19 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flexbuffers" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15d14128f06405808ce75bfebe11e9b0f9da18719ede6d7bdb1702d6bfe0f7e8" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "num_enum", + "serde", + "serde_derive", +] + [[package]] name = "flume" version = "0.11.0" @@ -1989,6 +2012,7 @@ dependencies = [ name = "morax-runtime" version = "0.1.0" dependencies = [ + "better-panic", "error-stack", "fastrace", "futures", @@ -2054,7 +2078,9 @@ dependencies = [ name = "morax-wal-broker" version = "0.1.0" dependencies = [ + "base64 0.22.1", "error-stack", + "flexbuffers", "log", "mime", "morax-meta", @@ -2062,6 +2088,7 @@ dependencies = [ "morax-runtime", "morax-storage", "poem", + "serde", "serde_json", "thiserror", ] @@ -2512,20 +2539,6 @@ name = "pollster" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22686f4785f02a4fcc856d3b3bb19bf6c8160d103f7a99cc258bddd0251dc7f2" -dependencies = [ - "pollster-macro", -] - -[[package]] -name = "pollster-macro" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea78f0ef4193055a4b09814ce6bcb572ad1174d6023e2f00a9ea1a798d18d301" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] [[package]] name = "powerfmt" @@ -4337,6 +4350,7 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" name = "wal-tests" version = "0.1.0" dependencies = [ + "base64 0.22.1", "insta", "log", "morax-protos", diff --git a/Cargo.toml b/Cargo.toml index eee120a..a3108cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,8 @@ version = "0.1.0" [workspace.dependencies] backon = { version = "1.0", features = ["tokio-sleep"] } +base64 = { version = "0.22" } +better-panic = { version = "0.3" } build-data = { version = "0.2" } byteorder = { version = "1.5" } clap = { version = "4.5", features = ["derive"] } @@ -52,6 +54,7 @@ const_format = { version = "0.2" } ctrlc = "3.4" error-stack = { version = "0.5" } fastrace = { version = "0.7", features = ["enable"] } +flexbuffers = { version = "2.0" } futures = "0.3" gix-discover = "0.35" insta = { version = "1.38", features = ["json"] } diff --git a/api/protos/src/lib.rs b/api/protos/src/lib.rs index cfafed4..3747e4e 100644 --- a/api/protos/src/lib.rs +++ b/api/protos/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +extern crate core; + pub mod config; pub mod property; pub mod rpc; diff --git a/api/protos/src/rpc.rs b/api/protos/src/rpc.rs index 270c17e..7d90938 100644 --- a/api/protos/src/rpc.rs +++ b/api/protos/src/rpc.rs @@ -12,11 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Range; + use serde::Deserialize; use serde::Serialize; use crate::property::TopicProps; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Entry { + /// Index of this log entry, assigned by the broker when the message is published. Guaranteed + /// to be unique within the topic. It must not be populated by the publisher in a write + /// call. + #[serde(skip_serializing_if = "Option::is_none")] + pub index: Option, + /// A padded, base64-encoded string of bytes, encoded with a URL and filename safe alphabet + /// (sometimes referred to as "web-safe" or "base64url"). Defined by [RFC4648]. + /// + /// [RFC4648]: https://datatracker.ietf.org/doc/html/rfc4648 + pub data: String, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateLogRequest { pub name: String, @@ -31,14 +47,13 @@ pub struct CreateLogResponse { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AppendLogRequest { pub name: String, - pub data: Vec, - pub entry_cnt: i32, + pub entries: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AppendLogResponse { - pub start_offset: i64, - pub end_offset: i64, + /// The half-open offset range of the appended entry. + pub offsets: Range, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -47,18 +62,9 @@ pub struct ReadLogRequest { pub offset: i64, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ReadLogResponse { - pub data: Vec, -} - -// TODO(tisonkun): adopts bytes debug fmt or find other robust ways -impl std::fmt::Debug for ReadLogResponse { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ReadLogResponse") - .field("data", &String::from_utf8_lossy(&self.data)) - .finish() - } + pub entries: Vec, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] diff --git a/cmd/morax/src/command.rs b/cmd/morax/src/command.rs index 75ba9f3..8128c78 100644 --- a/cmd/morax/src/command.rs +++ b/cmd/morax/src/command.rs @@ -69,19 +69,25 @@ impl CommandStart { }; morax_runtime::init(&config.runtime); - ctrlc::set_handler(move || { - morax_runtime::shutdown(); - }) - .change_context_lazy(|| Error("failed to setup ctrl-c signal handle".to_string()))?; + morax_telemetry::init(&config.telemetry); + + error_stack::Report::set_color_mode(error_stack::fmt::ColorMode::None); + error_stack::Report::set_charset(error_stack::fmt::Charset::Ascii); let rt = morax_runtime::make_runtime("morax-main", "morax-main", 1); rt.block_on(async move { - morax_telemetry::init(&config.telemetry); let state = morax_server::start(config.server) .await .change_context_lazy(|| { Error("A fatal error has occurred in Morax server process.".to_string()) })?; + + let shutdown_handle = state.shutdown_handle(); + ctrlc::set_handler(move || { + shutdown_handle(); + }) + .change_context_lazy(|| Error("failed to setup ctrl-c signal handle".to_string()))?; + state.await_shutdown().await; Ok(()) }) diff --git a/crates/kafka-broker/src/network.rs b/crates/kafka-broker/src/network.rs index 416359e..53d7951 100644 --- a/crates/kafka-broker/src/network.rs +++ b/crates/kafka-broker/src/network.rs @@ -69,10 +69,6 @@ pub async fn start_broker( log::info!("Morax Server is closing"); return Ok(()); } - _ = morax_runtime::wait_shutdown() => { - log::info!("Morax Server is shutting down"); - return Ok(()); - } socket = broker_listener.accept() => socket, }; @@ -100,10 +96,6 @@ async fn process_packet( log::info!("Morax Server is closing"); return Ok(()); } - _ = morax_runtime::wait_shutdown() => { - log::info!("Morax Server is shutting down"); - return Ok(()); - } closed = process_packet_one(&mut socket, &remote_addr, &broker) => { if closed? { return Ok(()); diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 1a26c4d..cad7e61 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -26,6 +26,7 @@ version.workspace = true test = [] [dependencies] +better-panic = { workspace = true } error-stack = { workspace = true } fastrace = { workspace = true } futures = { workspace = true } @@ -38,7 +39,7 @@ tokio = { workspace = true } tokio-util = { workspace = true } [dev-dependencies] -pollster = { version = "0.3", features = ['macro'] } +pollster = { version = "0.3" } [lints] workspace = true diff --git a/crates/runtime/src/global.rs b/crates/runtime/src/global.rs index b8112fb..4480e7b 100644 --- a/crates/runtime/src/global.rs +++ b/crates/runtime/src/global.rs @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; use std::sync::OnceLock; -use latches::task::Latch; use serde::Deserialize; use serde::Serialize; @@ -45,7 +43,6 @@ struct GlobalRuntimes { meta_runtime: Runtime, data_runtime: Runtime, bg_runtime: Runtime, - shutdown: Arc, } static GLOBAL_RUNTIMES: OnceLock = OnceLock::new(); @@ -84,23 +81,31 @@ pub fn init(opts: &RuntimeOptions) { GLOBAL_RUNTIMES.get_or_init(|| do_initialize_runtimes(opts)); } +fn setup_panic_hook() { + std::panic::set_hook(Box::new(move |info| { + let backtrace = std::backtrace::Backtrace::capture(); + log::error!("panic occurred: {info}\nbacktrace:\n{backtrace}"); + better_panic::Settings::auto().create_panic_handler()(info); + shutdown(); + })); +} + fn do_initialize_runtimes(opts: &RuntimeOptions) -> GlobalRuntimes { log::info!("initializing global runtimes: {opts:?}"); + setup_panic_hook(); + let api_runtime = make_runtime("api_runtime", "api_thread", opts.api_runtime_threads); let exec_runtime = make_runtime("exec_runtime", "exec_thread", opts.exec_runtime_threads); let meta_runtime = make_runtime("meta_runtime", "meta_thread", opts.meta_runtime_threads); let data_runtime = make_runtime("data_runtime", "data_thread", opts.data_runtime_threads); let bg_runtime = make_runtime("bg_runtime", "bg_thread", opts.bg_runtime_threads); - let shutdown = Arc::new(Latch::new(1)); - GlobalRuntimes { api_runtime, exec_runtime, meta_runtime, data_runtime, bg_runtime, - shutdown, } } @@ -129,20 +134,12 @@ pub fn bg_runtime() -> &'static Runtime { } pub fn shutdown() { - let runtimes = fetch_runtimes_or_default(); - runtimes.shutdown.count_down(); -} - -pub async fn wait_shutdown() { - fetch_runtimes_or_default().shutdown.wait().await; + log::info!("The Global Runtimes are shutting down."); + std::process::exit(1); } #[cfg(test)] mod tests { - use core::panic; - - use pollster::FutureExt; - use super::*; #[test] @@ -180,10 +177,4 @@ mod tests { assert_eq!(out, "hello") } } - - #[test] - fn test_task_panic() { - let _fut = exec_runtime().spawn(async { panic!("test panic") }); - wait_shutdown().block_on(); - } } diff --git a/crates/runtime/src/runtime.rs b/crates/runtime/src/runtime.rs index 2cbddc9..f78731f 100644 --- a/crates/runtime/src/runtime.rs +++ b/crates/runtime/src/runtime.rs @@ -17,7 +17,6 @@ use std::any::type_name; use std::future::Future; use std::panic::resume_unwind; -use std::panic::AssertUnwindSafe; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -26,7 +25,6 @@ use std::time::Duration; use fastrace::future::FutureExt as _; use fastrace::Span; use futures::ready; -use futures::FutureExt as _; static RUNTIME_ID: AtomicUsize = AtomicUsize::new(0); @@ -42,51 +40,29 @@ impl Runtime { Builder::default() } - /// Spawn a future and execute it in this thread pool + /// Spawn a future and execute it in this thread pool. /// - /// Similar to tokio::runtime::Runtime::spawn() - #[must_use = "this task may panic, join it to properly observe panics"] + /// Similar to [`tokio::runtime::Runtime::spawn`]. pub fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { let name = type_name::(); - let catch_unwind = async { - AssertUnwindSafe(future) - .catch_unwind() - .await - .map_err(|payload| -> ! { - log::error!("task panicked: {:?}", payload); - crate::shutdown(); - resume_unwind(payload) - }) - .into_ok() - }; - JoinHandle::new( - self.runtime - .spawn(catch_unwind.in_span(Span::enter_with_local_parent(name))), - ) + let handle = self + .runtime + .spawn(future.in_span(Span::enter_with_local_parent(name))); + JoinHandle::new(handle) } /// Run the provided function on an executor dedicated to blocking /// operations. - #[must_use = "this task may panic, join it to properly observe panics"] pub fn spawn_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let catch_unwind = || { - std::panic::catch_unwind(AssertUnwindSafe(func)) - .map_err(|payload| -> ! { - log::error!("task panicked: {:?}", payload); - crate::shutdown(); - resume_unwind(payload) - }) - .into_ok() - }; - JoinHandle::new(self.runtime.spawn_blocking(catch_unwind)) + JoinHandle::new(self.runtime.spawn_blocking(func)) } /// Run a future to complete, this is the runtime entry point @@ -133,7 +109,6 @@ impl Future for JoinHandle { Ok(val) => std::task::Poll::Ready(Ok(val)), Err(err) => { if err.is_panic() { - crate::shutdown(); resume_unwind(err.into_panic()) } else { std::task::Poll::Ready(Err(CanceledError)) diff --git a/crates/runtime/src/wait_group.rs b/crates/runtime/src/wait_group.rs index 4e38f82..a28eef1 100644 --- a/crates/runtime/src/wait_group.rs +++ b/crates/runtime/src/wait_group.rs @@ -108,7 +108,7 @@ mod test { for _ in 0..100 { let w = wg.clone(); - let _drop = test_runtime().spawn(async move { + test_runtime().spawn(async move { drop(w); }); } diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 94e366a..20b14a3 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -49,11 +49,18 @@ impl ServerState { self.wal_broker_addr } + pub fn shutdown_handle(&self) -> impl Fn() { + let shutdown = self.shutdown.clone(); + move || shutdown.count_down() + } + pub fn shutdown(&self) { - self.shutdown.count_down(); + self.shutdown_handle()(); } pub async fn await_shutdown(self) { + self.shutdown.wait().await; + match futures::future::try_join_all(vec![ flatten(self.kafka_broker_fut), flatten(self.wal_broker_fut), diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 83d281d..93933b0 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -50,6 +50,7 @@ impl TopicStorage { records: Vec, ) -> Result { let op = self.op()?; + // TODO(tisonkun): whether use a sequential number rather than a UUID let split_id = uuid::Uuid::new_v4(); let split_url = format!("{topic_name}/{partition_id}/{split_id}"); op.write(&split_url, records) diff --git a/crates/wal-broker/Cargo.toml b/crates/wal-broker/Cargo.toml index 3a5c4cd..c3b5f6a 100644 --- a/crates/wal-broker/Cargo.toml +++ b/crates/wal-broker/Cargo.toml @@ -23,7 +23,9 @@ repository.workspace = true version.workspace = true [dependencies] +base64 = { workspace = true } error-stack = { workspace = true } +flexbuffers = { workspace = true } log = { workspace = true } mime = { workspace = true } morax-meta = { workspace = true } @@ -31,6 +33,7 @@ morax-protos = { workspace = true } morax-runtime = { workspace = true } morax-storage = { workspace = true } poem = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/crates/wal-broker/src/wal.rs b/crates/wal-broker/src/wal.rs index b8b196d..9fa687d 100644 --- a/crates/wal-broker/src/wal.rs +++ b/crates/wal-broker/src/wal.rs @@ -14,6 +14,8 @@ use std::sync::Arc; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; use error_stack::Result; use error_stack::ResultExt; use morax_meta::CommitRecordBatchesRequest; @@ -24,12 +26,21 @@ use morax_protos::rpc::AppendLogRequest; use morax_protos::rpc::AppendLogResponse; use morax_protos::rpc::CreateLogRequest; use morax_protos::rpc::CreateLogResponse; +use morax_protos::rpc::Entry; use morax_protos::rpc::ReadLogRequest; use morax_protos::rpc::ReadLogResponse; use morax_storage::TopicStorage; +use serde::Deserialize; +use serde::Serialize; use crate::BrokerError; +// TODO(tisonkun): figure out whether flexbuffers is the proper format +#[derive(Debug, Clone, Serialize, Deserialize)] +struct EntryData { + data: Vec, +} + #[derive(Debug, Clone)] pub struct WALBroker { meta: Arc, @@ -82,17 +93,32 @@ impl WALBroker { .await .change_context_lazy(make_error)?; - let mut data = vec![]; + let mut entries = vec![]; for split in splits { debug_assert_eq!(&split.topic_name, &topic.name); debug_assert_eq!(split.partition_id, 0); - let mut part = topic_storage + let data = topic_storage .read_at(&split.topic_name, split.partition_id, &split.split_id) .await .change_context_lazy(make_error)?; - data.append(&mut part); + let deserializer = + flexbuffers::Reader::get_root(data.as_slice()).change_context_lazy(|| { + BrokerError("failed to deserialize entry data".to_string()) + })?; + let entry_data = + Vec::::deserialize(deserializer).change_context_lazy(|| { + BrokerError("failed to deserialize entry data".to_string()) + })?; + for (i, entry_data) in entry_data.into_iter().enumerate() { + if split.start_offset + i as i64 >= request.offset { + entries.push(Entry { + index: Some(split.start_offset + i as i64), + data: BASE64_STANDARD.encode(&entry_data.data), + }); + } + } } - Ok(ReadLogResponse { data }) + Ok(ReadLogResponse { entries }) } pub async fn append( @@ -109,8 +135,28 @@ impl WALBroker { .change_context_lazy(make_error)?; let topic_storage = TopicStorage::new(topic.properties.0.storage); + let entry_cnt = request.entries.len(); + let entry_data = { + let mut entry_data = vec![]; + for entry in request.entries.into_iter() { + entry_data.push(EntryData { + data: BASE64_STANDARD + .decode(entry.data.as_bytes()) + .change_context_lazy(|| { + BrokerError(format!("failed to decode base64: {:?}", entry.data)) + })?, + }); + } + let mut serializer = flexbuffers::FlexbufferSerializer::new(); + entry_data + .serialize(&mut serializer) + .change_context_lazy(|| { + BrokerError("failed to serialize entry data".to_string()) + })?; + serializer.take_buffer() + }; let split_id = topic_storage - .write_to(&topic.name, 0, request.data) + .write_to(&topic.name, 0, entry_data) .await .change_context_lazy(make_error)?; @@ -119,15 +165,14 @@ impl WALBroker { .commit_record_batches(CommitRecordBatchesRequest { topic_name: name.clone(), partition_id: 0, - record_len: request.entry_cnt, + record_len: entry_cnt as i32, split_id, }) .await .change_context_lazy(make_error)?; Ok(AppendLogResponse { - start_offset, - end_offset, + offsets: start_offset..end_offset, }) } } diff --git a/tests/wal/Cargo.toml b/tests/wal/Cargo.toml index 6a44a1d..be1fd74 100644 --- a/tests/wal/Cargo.toml +++ b/tests/wal/Cargo.toml @@ -24,6 +24,7 @@ repository.workspace = true version.workspace = true [dependencies] +base64 = { workspace = true } log = { workspace = true } morax-protos = { workspace = true } morax-runtime = { workspace = true, features = ["test"] } diff --git a/tests/wal/tests/read_after_append.rs b/tests/wal/tests/read_after_append.rs index d36c862..0b29ed1 100644 --- a/tests/wal/tests/read_after_append.rs +++ b/tests/wal/tests/read_after_append.rs @@ -12,14 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +use base64::prelude::BASE64_STANDARD; +use base64::Engine; use insta::assert_compact_debug_snapshot; use morax_protos::rpc::AppendLogRequest; use morax_protos::rpc::CreateLogRequest; +use morax_protos::rpc::Entry; use morax_protos::rpc::ReadLogRequest; use test_harness::test; use wal_tests::harness; use wal_tests::Testkit; +fn make_entry(payload: &str) -> Entry { + Entry { + index: None, + data: BASE64_STANDARD.encode(payload), + } +} + #[test(harness)] async fn test_simple_pubsub(testkit: Testkit) { let name = "db_log".to_string(); @@ -39,17 +49,16 @@ async fn test_simple_pubsub(testkit: Testkit) { .client .append_log(AppendLogRequest { name: name.clone(), - data: "0;1;2;3;4;5;6;7;8;9;".as_bytes().to_vec(), - entry_cnt: 10, + entries: vec![make_entry("0"), make_entry("1")], }) .await .unwrap(); - assert_compact_debug_snapshot!(r, @"Success(AppendLogResponse { start_offset: 0, end_offset: 10 })"); + assert_compact_debug_snapshot!(r, @"Success(AppendLogResponse { offsets: 0..2 })"); let r = testkit .client .read_log(ReadLogRequest { name, offset: 0 }) .await .unwrap(); - assert_compact_debug_snapshot!(r, @r###"Success(ReadLogResponse { data: "0;1;2;3;4;5;6;7;8;9;" })"###); + assert_compact_debug_snapshot!(r, @r###"Success(ReadLogResponse { entries: [Entry { index: Some(0), data: "MA==" }, Entry { index: Some(1), data: "MQ==" }] })"###); }