Skip to content

Commit

Permalink
feat(eventing): add io-engine events
Browse files Browse the repository at this point in the history
Signed-off-by: Vandana Varakantham <[email protected]>
  • Loading branch information
datacore-vvarakantham committed Jan 9, 2024
1 parent 5c3ae08 commit edde7a5
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 10 deletions.
21 changes: 13 additions & 8 deletions io-engine/src/bdev/nexus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{pin::Pin, sync::atomic::AtomicBool};

use crate::core::VerboseError;
use events_api::event::EventAction;
use futures::{future::Future, FutureExt};

mod nexus_bdev;
Expand All @@ -21,7 +22,7 @@ mod nexus_nbd;
mod nexus_persistence;
mod nexus_share;

use crate::bdev::nexus::nexus_iter::NexusIterMut;
use crate::{bdev::nexus::nexus_iter::NexusIterMut, eventing::Event};
pub(crate) use nexus_bdev::NEXUS_PRODUCT_ID;
pub use nexus_bdev::{
nexus_create,
Expand Down Expand Up @@ -173,15 +174,19 @@ pub async fn shutdown_nexuses() {

for mut nexus in nexuses.into_iter() {
// Destroy nexus and persist its state in the ETCd.
if let Err(error) = nexus.as_mut().destroy_ext(true).await {
error!(
name = nexus.name,
error = error.verbose(),
"Failed to destroy nexus"
);
match nexus.as_mut().destroy_ext(true).await {
Ok(_) => {
nexus.event(EventAction::Shutdown).generate();
}
Err(error) => {
error!(
name = nexus.name,
error = error.verbose(),
"Failed to destroy nexus"
);
}
}
}

info!("All nexus have been shutdown.");
}

Expand Down
4 changes: 4 additions & 0 deletions io-engine/src/bin/io-engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ extern crate tracing;

use std::{env, path::Path, sync::atomic::Ordering};

use events_api::event::EventAction;

use futures::future::FutureExt;

use io_engine::{
Expand All @@ -25,6 +27,7 @@ use io_engine::{
Mthread,
Reactors,
},
eventing::Event,
grpc,
logger,
persistent_store::PersistentStoreBuilder,
Expand Down Expand Up @@ -270,5 +273,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Reactors::current().poll_reactor();

ms.fini();
ms.event(EventAction::Start).generate();
Ok(())
}
8 changes: 8 additions & 0 deletions io-engine/src/core/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{

use byte_unit::{Byte, ByteUnit};
use clap::Parser;
use events_api::event::EventAction;
use futures::{channel::oneshot, future};
use http::Uri;
use once_cell::sync::{Lazy, OnceCell};
Expand Down Expand Up @@ -49,6 +50,7 @@ use crate::{
MayastorFeatures,
Mthread,
},
eventing::Event,
grpc,
grpc::MayastorGrpcServer,
logger,
Expand Down Expand Up @@ -466,6 +468,9 @@ async fn do_shutdown(arg: *mut c_void) {
spdk_rpc_finish();
spdk_subsystem_fini(Some(reactors_stop), arg);
}
MayastorEnvironment::global_or_default()
.event(EventAction::Shutdown)
.generate();
}

/// main shutdown routine for mayastor
Expand All @@ -477,6 +482,9 @@ pub fn mayastor_env_stop(rc: i32) {
r.send_future(async move {
do_shutdown(rc as *const i32 as *mut c_void).await;
});
MayastorEnvironment::global_or_default()
.event(EventAction::Stop)
.generate();
}
_ => {
panic!("invalid reactor state during shutdown");
Expand Down
8 changes: 7 additions & 1 deletion io-engine/src/core/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::{
use once_cell::sync::OnceCell;

use crossbeam::channel::{unbounded, Receiver, Sender};
use events_api::event::EventAction;
use futures::{
channel::oneshot::{Receiver as OnceShotRecv, Sender as OneShotSend},
task::{Context, Poll},
Expand All @@ -61,7 +62,10 @@ use spdk_rs::libspdk::{
SPDK_THREAD_OP_NEW,
};

use crate::core::{CoreError, Cores};
use crate::{
core::{CoreError, Cores},
eventing::Event,
};
use gettid::gettid;
use nix::errno::Errno;

Expand Down Expand Up @@ -752,12 +756,14 @@ pub async fn reactor_monitor_loop(freeze_timeout: Option<u64>) {
if tick - r.reactor_tick.load(Ordering::Relaxed) == 0 {
info!(core = r.core, "Reactor is healthy again");
r.frozen = false;
r.reactor.event(EventAction::ReactorUnfreeze).generate();
}
} else {
// Reactor didn't respond within allowed number of intervals,
// assume it is frozen.
if tick - r.reactor_tick.load(Ordering::Relaxed) >= timeout {
r.frozen = true;
r.reactor.event(EventAction::ReactorFreeze).generate();
crate::core::diagnostics::diagnose_reactor(r.reactor);
}
}
Expand Down
44 changes: 44 additions & 0 deletions io-engine/src/eventing/io_engine_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use events_api::event::{
EventAction,
EventCategory,
EventMessage,
EventMeta,
EventSource,
};

use crate::{
core::{MayastorEnvironment, Reactor},
eventing::Event,
};

// Io-engine event message from Mayastor env data.
impl Event for MayastorEnvironment {
fn event(&self, event_action: EventAction) -> EventMessage {
let event_source = EventSource::new(self.node_name.clone());
EventMessage {
category: EventCategory::IoEngineCategory as i32,
action: event_action as i32,
target: self.name.clone(),
metadata: Some(EventMeta::from_source(event_source)),
}
}
}

// Reactor event message from Reactor data.
impl Event for Reactor {
fn event(&self, event_action: EventAction) -> EventMessage {
let event_source = EventSource::new(
MayastorEnvironment::global_or_default().node_name,
)
.with_reactor_details(
self.core().into(),
&self.get_state().to_string(),
);
EventMessage {
category: EventCategory::IoEngineCategory as i32,
action: event_action as i32,
target: self.tid().to_string(),
metadata: Some(EventMeta::from_source(event_source)),
}
}
}
3 changes: 2 additions & 1 deletion io-engine/src/eventing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
pub(crate) mod host_events;
mod io_engine_events;
mod nexus_child_events;
pub(crate) mod nexus_events;
mod pool_events;
mod replica_events;
use events_api::event::{EventAction, EventMessage, EventMeta};

/// Event trait definition for creating events.
pub(crate) trait Event {
pub trait Event {
/// Create event message.
fn event(&self, event_action: EventAction) -> EventMessage;
}
Expand Down

0 comments on commit edde7a5

Please sign in to comment.