diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index c487742039a..733f1ebacaf 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -45,10 +45,12 @@ pub struct ProcessingEngineManagerImpl { #[derive(Debug, Default)] struct PluginChannels { - /// Map of database to trigger name to sender - active_triggers: HashMap>>, - /// Map of request path to the sender - request_triggers: HashMap>, + /// Map of database to wal trigger name to handler + wal_triggers: HashMap>>, + /// Map of database to schedule trigger name to handler + schedule_triggers: HashMap>>, + /// Map of request path to the request trigger handler + request_triggers: HashMap>, } #[cfg(feature = "system-py")] @@ -60,77 +62,115 @@ impl PluginChannels { &self, db: String, trigger: String, + trigger_spec: &TriggerSpecificationDefinition, ) -> Result>, ProcessingEngineError> { - if let Some(trigger_map) = self.active_triggers.get(&db) { - if let Some(sender) = trigger_map.get(&trigger) { - // create a one shot to wait for the shutdown to complete - let (tx, rx) = oneshot::channel(); - if sender.send(PluginEvent::Shutdown(tx)).await.is_err() { - return Err(ProcessingEngineError::TriggerShutdownError { - database: db, - trigger_name: trigger, - }); + match trigger_spec { + TriggerSpecificationDefinition::SingleTableWalWrite { .. } + | TriggerSpecificationDefinition::AllTablesWalWrite => { + if let Some(trigger_map) = self.wal_triggers.get(&db) { + if let Some(sender) = trigger_map.get(&trigger) { + // create a one shot to wait for the shutdown to complete + let (tx, rx) = oneshot::channel(); + if sender.send(WalEvent::Shutdown(tx)).await.is_err() { + return Err(ProcessingEngineError::TriggerShutdownError { + database: db, + trigger_name: trigger, + }); + } + return Ok(Some(rx)); + } + } + } + TriggerSpecificationDefinition::Schedule { .. } + | TriggerSpecificationDefinition::Every { .. } => { + if let Some(trigger_map) = self.schedule_triggers.get(&db) { + if let Some(sender) = trigger_map.get(&trigger) { + // create a one shot to wait for the shutdown to complete + let (tx, rx) = oneshot::channel(); + if sender.send(ScheduleEvent::Shutdown(tx)).await.is_err() { + return Err(ProcessingEngineError::TriggerShutdownError { + database: db, + trigger_name: trigger, + }); + } + return Ok(Some(rx)); + } + } + } + TriggerSpecificationDefinition::RequestPath { .. } => { + if let Some(sender) = self.request_triggers.get(&trigger) { + // create a one shot to wait for the shutdown to complete + let (tx, rx) = oneshot::channel(); + if sender.send(RequestEvent::Shutdown(tx)).await.is_err() { + return Err(ProcessingEngineError::TriggerShutdownError { + database: db, + trigger_name: trigger, + }); + } + return Ok(Some(rx)); } - return Ok(Some(rx)); } } - Ok(None) - } - fn remove_trigger(&mut self, db: String, trigger: String) { - if let Some(trigger_map) = self.active_triggers.get_mut(&db) { - trigger_map.remove(&trigger); - } + Ok(None) } - #[cfg(feature = "system-py")] - fn add_trigger( + fn remove_trigger( &mut self, - trigger_spec: &TriggerSpecificationDefinition, db: String, trigger: String, - ) -> mpsc::Receiver { - observability_deps::tracing::info!(%db, ?trigger, ?trigger_spec, "adding trigger to plugin event channel"); - let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE); - + trigger_spec: &TriggerSpecificationDefinition, + ) { match trigger_spec { - TriggerSpecificationDefinition::SingleTableWalWrite { .. } => { - self.active_triggers - .entry(db) - .or_default() - .insert(trigger, tx); - } - TriggerSpecificationDefinition::AllTablesWalWrite => { - self.active_triggers - .entry(db) - .or_default() - .insert(trigger, tx); - } - TriggerSpecificationDefinition::Schedule { .. } => { - self.active_triggers - .entry(db) - .or_default() - .insert(trigger, tx); + TriggerSpecificationDefinition::SingleTableWalWrite { .. } + | TriggerSpecificationDefinition::AllTablesWalWrite => { + if let Some(trigger_map) = self.wal_triggers.get_mut(&db) { + trigger_map.remove(&trigger); + } } - TriggerSpecificationDefinition::Every { .. } => { - self.active_triggers - .entry(db) - .or_default() - .insert(trigger, tx); + TriggerSpecificationDefinition::Schedule { .. } + | TriggerSpecificationDefinition::Every { .. } => { + if let Some(trigger_map) = self.schedule_triggers.get_mut(&db) { + trigger_map.remove(&trigger); + } } - TriggerSpecificationDefinition::RequestPath { path } => { - self.request_triggers.insert(path.to_string(), tx); + TriggerSpecificationDefinition::RequestPath { .. } => { + self.request_triggers.remove(&trigger); } } + } + + #[cfg(feature = "system-py")] + fn add_wal_trigger(&mut self, db: String, trigger: String) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE); + self.wal_triggers.entry(db).or_default().insert(trigger, tx); + rx + } + + fn add_schedule_trigger( + &mut self, + db: String, + trigger: String, + ) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE); + self.schedule_triggers + .entry(db) + .or_default() + .insert(trigger, tx); + rx + } + fn add_request_trigger(&mut self, path: String) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE); + self.request_triggers.insert(path, tx); rx } async fn send_wal_contents(&self, wal_contents: Arc) { - for (db, trigger_map) in &self.active_triggers { + for (db, trigger_map) in &self.wal_triggers { for (trigger, sender) in trigger_map { if let Err(e) = sender - .send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents))) + .send(WalEvent::WriteWalContents(Arc::clone(&wal_contents))) .await { warn!(%e, %db, ?trigger, "error sending wal contents to plugin"); @@ -144,7 +184,7 @@ impl PluginChannels { trigger_path: &str, request: Request, ) -> Result<(), ProcessingEngineError> { - let event = PluginEvent::Request(request); + let event = RequestEvent::Request(request); if let Some(sender) = self.request_triggers.get(trigger_path) { if sender.send(event).await.is_err() { return Err(ProcessingEngineError::RequestTriggerNotFound); @@ -450,14 +490,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { })? .clone(); - let trigger_rx = self.plugin_event_tx.write().await.add_trigger( - &trigger.trigger, - db_name.to_string(), - trigger_name.to_string(), - ); - let plugin_context = PluginContext { - trigger_rx, write_buffer, query_executor, }; @@ -473,25 +506,52 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { .into()); }; match plugin_definition.plugin_type { - PluginType::WalRows => plugins::run_wal_contents_plugin( - db_name.to_string(), - plugin_code, - trigger, - plugin_context, - ), - PluginType::Scheduled => plugins::run_schedule_plugin( - db_name.to_string(), - plugin_code, - trigger, - Arc::clone(&self.time_provider), - plugin_context, - )?, - PluginType::Request => plugins::run_request_plugin( - db_name.to_string(), - plugin_code, - trigger, - plugin_context, - ), + PluginType::WalRows => { + let rec = self + .plugin_event_tx + .write() + .await + .add_wal_trigger(db_name.to_string(), trigger_name.to_string()); + + plugins::run_wal_contents_plugin( + db_name.to_string(), + plugin_code, + trigger, + plugin_context, + rec, + ) + } + PluginType::Scheduled => { + let rec = self + .plugin_event_tx + .write() + .await + .add_schedule_trigger(db_name.to_string(), trigger_name.to_string()); + + plugins::run_schedule_plugin( + db_name.to_string(), + plugin_code, + trigger, + Arc::clone(&self.time_provider), + plugin_context, + rec, + )? + } + PluginType::Request => { + let rec = self + .plugin_event_tx + .write() + .await + .add_request_trigger(trigger_name.to_string()); + + plugins::run_request_plugin( + db_name.to_string(), + plugin_code, + trigger, + plugin_context, + rec, + ) + } } } @@ -537,7 +597,11 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { .plugin_event_tx .write() .await - .send_shutdown(db_name.to_string(), trigger_name.to_string()) + .send_shutdown( + db_name.to_string(), + trigger_name.to_string(), + &trigger.trigger, + ) .await? else { return Ok(()); @@ -548,10 +612,11 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { "shutdown trigger receiver dropped, may have received multiple shutdown requests" ); } else { - self.plugin_event_tx - .write() - .await - .remove_trigger(db_name.to_string(), trigger_name.to_string()); + self.plugin_event_tx.write().await.remove_trigger( + db_name.to_string(), + trigger_name.to_string(), + &trigger.trigger, + ); } Ok(()) @@ -738,9 +803,16 @@ impl WalFileNotifier for ProcessingEngineManagerImpl { } } -#[allow(unused)] -pub(crate) enum PluginEvent { +pub(crate) enum WalEvent { WriteWalContents(Arc), + Shutdown(oneshot::Sender<()>), +} + +pub(crate) enum ScheduleEvent { + Shutdown(oneshot::Sender<()>), +} + +pub(crate) enum RequestEvent { Request(Request), Shutdown(oneshot::Sender<()>), } diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index 5d9b9bb3991..cc19d7a6f7f 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -1,7 +1,7 @@ #[cfg(feature = "system-py")] use crate::PluginCode; +use crate::{RequestEvent, ScheduleEvent, WalEvent}; #[cfg(feature = "system-py")] -use crate::PluginEvent; use data_types::NamespaceName; use hashbrown::HashMap; use influxdb3_catalog::catalog::Catalog; @@ -73,6 +73,7 @@ pub(crate) fn run_wal_contents_plugin( plugin_code: PluginCode, trigger_definition: TriggerDefinition, context: PluginContext, + plugin_receiver: mpsc::Receiver, ) { let trigger_plugin = TriggerPlugin { trigger_definition, @@ -83,7 +84,7 @@ pub(crate) fn run_wal_contents_plugin( }; tokio::task::spawn(async move { trigger_plugin - .run_wal_contents_plugin(context.trigger_rx) + .run_wal_contents_plugin(plugin_receiver) .await .expect("trigger plugin failed"); }); @@ -96,7 +97,13 @@ pub(crate) fn run_schedule_plugin( trigger_definition: TriggerDefinition, time_provider: Arc, context: PluginContext, + plugin_receiver: mpsc::Receiver, ) -> Result<(), Error> { + let TriggerSpecificationDefinition::Schedule { .. } = &trigger_definition.trigger else { + // TODO: these linkages should be guaranteed by code. + unreachable!("this should've been checked"); + }; + let trigger_plugin = TriggerPlugin { trigger_definition, db_name, @@ -104,16 +111,18 @@ pub(crate) fn run_schedule_plugin( write_buffer: context.write_buffer, query_executor: context.query_executor, }; + let runner = python_plugin::ScheduleTriggerRunner::try_new( &trigger_plugin.trigger_definition.trigger, Arc::clone(&time_provider), )?; tokio::task::spawn(async move { trigger_plugin - .run_schedule_plugin(context.trigger_rx, runner, time_provider) + .run_schedule_plugin(plugin_receiver, runner, time_provider) .await .expect("cron trigger plugin failed"); }); + Ok(()) } @@ -123,6 +132,7 @@ pub(crate) fn run_request_plugin( plugin_code: PluginCode, trigger_definition: TriggerDefinition, context: PluginContext, + plugin_receiver: mpsc::Receiver, ) { let trigger_plugin = TriggerPlugin { trigger_definition, @@ -133,7 +143,7 @@ pub(crate) fn run_request_plugin( }; tokio::task::spawn(async move { trigger_plugin - .run_request_plugin(context.trigger_rx) + .run_request_plugin(plugin_receiver) .await .expect("trigger plugin failed"); }); @@ -141,8 +151,6 @@ pub(crate) fn run_request_plugin( #[cfg(feature = "system-py")] pub(crate) struct PluginContext { - // tokio channel for inputs - pub(crate) trigger_rx: mpsc::Receiver, // handler to write data back to the DB. pub(crate) write_buffer: Arc, // query executor to hand off to the plugin @@ -185,7 +193,7 @@ mod python_plugin { impl TriggerPlugin { pub(crate) async fn run_wal_contents_plugin( &self, - mut receiver: Receiver, + mut receiver: Receiver, ) -> Result<(), Error> { info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_name, "starting wal contents plugin"); @@ -199,15 +207,12 @@ mod python_plugin { }; match event { - PluginEvent::WriteWalContents(wal_contents) => { + WalEvent::WriteWalContents(wal_contents) => { if let Err(e) = self.process_wal_contents(wal_contents).await { error!(?self.trigger_definition, "error processing wal contents: {}", e); } } - PluginEvent::Request(_) => { - warn!("ignoring request in wal contents plugin.") - } - PluginEvent::Shutdown(sender) => { + WalEvent::Shutdown(sender) => { sender.send(()).map_err(|_| Error::FailedToShutdown)?; break; } @@ -219,7 +224,7 @@ mod python_plugin { pub(crate) async fn run_schedule_plugin( &self, - mut receiver: Receiver, + mut receiver: Receiver, mut runner: ScheduleTriggerRunner, time_provider: Arc, ) -> Result<(), Error> { @@ -241,13 +246,7 @@ mod python_plugin { warn!(?self.trigger_definition, "trigger plugin receiver closed"); break; } - Some(PluginEvent::WriteWalContents(_)) => { - warn!("ignoring wal contents in cron plugin.") - } - Some(PluginEvent::Request(_)) => { - warn!("ignoring request in cron plugin.") - } - Some(PluginEvent::Shutdown(sender)) => { + Some(ScheduleEvent::Shutdown(sender)) => { sender.send(()).map_err(|_| Error::FailedToShutdown)?; break; } @@ -261,7 +260,7 @@ mod python_plugin { pub(crate) async fn run_request_plugin( &self, - mut receiver: Receiver, + mut receiver: Receiver, ) -> Result<(), Error> { info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_name, "starting request plugin"); @@ -271,10 +270,7 @@ mod python_plugin { warn!(?self.trigger_definition, "trigger plugin receiver closed"); break; } - Some(PluginEvent::WriteWalContents(_)) => { - warn!("ignoring wal contents in request plugin.") - } - Some(PluginEvent::Request(request)) => { + Some(RequestEvent::Request(request)) => { let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { @@ -336,7 +332,7 @@ mod python_plugin { error!(?self.trigger_definition, "error sending response"); } } - Some(PluginEvent::Shutdown(sender)) => { + Some(RequestEvent::Shutdown(sender)) => { sender.send(()).map_err(|_| Error::FailedToShutdown)?; break; }