Skip to content

Commit

Permalink
feat: each plugin type uses its own channel
Browse files Browse the repository at this point in the history
This cleans up how plugins receive their triggering events. It'll also clean up the logs a bit without warnings that plugin events of the wrong type are going to different plugins.

Closes #25879
  • Loading branch information
pauldix committed Jan 23, 2025
1 parent f1ea2d8 commit 7c38b8b
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 113 deletions.
246 changes: 159 additions & 87 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ pub struct ProcessingEngineManagerImpl {

#[derive(Debug, Default)]
struct PluginChannels {
/// Map of database to trigger name to sender
active_triggers: HashMap<String, HashMap<String, mpsc::Sender<PluginEvent>>>,
/// Map of request path to the sender
request_triggers: HashMap<String, mpsc::Sender<PluginEvent>>,
/// Map of database to wal trigger name to handler
wal_triggers: HashMap<String, HashMap<String, mpsc::Sender<WalEvent>>>,
/// Map of database to schedule trigger name to handler
schedule_triggers: HashMap<String, HashMap<String, mpsc::Sender<ScheduleEvent>>>,
/// Map of request path to the request trigger handler
request_triggers: HashMap<String, mpsc::Sender<RequestEvent>>,
}

#[cfg(feature = "system-py")]
Expand All @@ -60,77 +62,115 @@ impl PluginChannels {
&self,
db: String,
trigger: String,
trigger_spec: &TriggerSpecificationDefinition,
) -> Result<Option<Receiver<()>>, ProcessingEngineError> {
if let Some(trigger_map) = self.active_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(PluginEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
match trigger_spec {
TriggerSpecificationDefinition::SingleTableWalWrite { .. }
| TriggerSpecificationDefinition::AllTablesWalWrite => {
if let Some(trigger_map) = self.wal_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(WalEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
}
}
TriggerSpecificationDefinition::Schedule { .. }
| TriggerSpecificationDefinition::Every { .. } => {
if let Some(trigger_map) = self.schedule_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(ScheduleEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
}
}
TriggerSpecificationDefinition::RequestPath { .. } => {
if let Some(sender) = self.request_triggers.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(RequestEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
return Ok(Some(rx));
}
}
Ok(None)
}

fn remove_trigger(&mut self, db: String, trigger: String) {
if let Some(trigger_map) = self.active_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
}
Ok(None)
}

#[cfg(feature = "system-py")]
fn add_trigger(
fn remove_trigger(
&mut self,
trigger_spec: &TriggerSpecificationDefinition,
db: String,
trigger: String,
) -> mpsc::Receiver<PluginEvent> {
observability_deps::tracing::info!(%db, ?trigger, ?trigger_spec, "adding trigger to plugin event channel");
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);

trigger_spec: &TriggerSpecificationDefinition,
) {
match trigger_spec {
TriggerSpecificationDefinition::SingleTableWalWrite { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::AllTablesWalWrite => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::Schedule { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
TriggerSpecificationDefinition::SingleTableWalWrite { .. }
| TriggerSpecificationDefinition::AllTablesWalWrite => {
if let Some(trigger_map) = self.wal_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
}
}
TriggerSpecificationDefinition::Every { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
TriggerSpecificationDefinition::Schedule { .. }
| TriggerSpecificationDefinition::Every { .. } => {
if let Some(trigger_map) = self.schedule_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
}
}
TriggerSpecificationDefinition::RequestPath { path } => {
self.request_triggers.insert(path.to_string(), tx);
TriggerSpecificationDefinition::RequestPath { .. } => {
self.request_triggers.remove(&trigger);
}
}
}

#[cfg(feature = "system-py")]
fn add_wal_trigger(&mut self, db: String, trigger: String) -> mpsc::Receiver<WalEvent> {
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);
self.wal_triggers.entry(db).or_default().insert(trigger, tx);
rx
}

fn add_schedule_trigger(
&mut self,
db: String,
trigger: String,
) -> mpsc::Receiver<ScheduleEvent> {
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);
self.schedule_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
rx
}

fn add_request_trigger(&mut self, path: String) -> mpsc::Receiver<RequestEvent> {
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);
self.request_triggers.insert(path, tx);
rx
}

async fn send_wal_contents(&self, wal_contents: Arc<WalContents>) {
for (db, trigger_map) in &self.active_triggers {
for (db, trigger_map) in &self.wal_triggers {
for (trigger, sender) in trigger_map {
if let Err(e) = sender
.send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents)))
.send(WalEvent::WriteWalContents(Arc::clone(&wal_contents)))
.await
{
warn!(%e, %db, ?trigger, "error sending wal contents to plugin");
Expand All @@ -144,7 +184,7 @@ impl PluginChannels {
trigger_path: &str,
request: Request,
) -> Result<(), ProcessingEngineError> {
let event = PluginEvent::Request(request);
let event = RequestEvent::Request(request);
if let Some(sender) = self.request_triggers.get(trigger_path) {
if sender.send(event).await.is_err() {
return Err(ProcessingEngineError::RequestTriggerNotFound);
Expand Down Expand Up @@ -450,14 +490,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
})?
.clone();

let trigger_rx = self.plugin_event_tx.write().await.add_trigger(
&trigger.trigger,
db_name.to_string(),
trigger_name.to_string(),
);

let plugin_context = PluginContext {
trigger_rx,
write_buffer,
query_executor,
};
Expand All @@ -473,25 +506,52 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
.into());
};
match plugin_definition.plugin_type {
PluginType::WalRows => plugins::run_wal_contents_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
),
PluginType::Scheduled => plugins::run_schedule_plugin(
db_name.to_string(),
plugin_code,
trigger,
Arc::clone(&self.time_provider),
plugin_context,
)?,
PluginType::Request => plugins::run_request_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
),
PluginType::WalRows => {
let rec = self
.plugin_event_tx
.write()
.await
.add_wal_trigger(db_name.to_string(), trigger_name.to_string());

plugins::run_wal_contents_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
rec,
)
}
PluginType::Scheduled => {
let rec = self
.plugin_event_tx
.write()
.await
.add_schedule_trigger(db_name.to_string(), trigger_name.to_string());

plugins::run_schedule_plugin(
db_name.to_string(),
plugin_code,
trigger,
Arc::clone(&self.time_provider),
plugin_context,
rec,
)?
}
PluginType::Request => {
let rec = self
.plugin_event_tx
.write()
.await
.add_request_trigger(trigger_name.to_string());

plugins::run_request_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
rec,
)
}
}
}

Expand Down Expand Up @@ -537,7 +597,11 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
.plugin_event_tx
.write()
.await
.send_shutdown(db_name.to_string(), trigger_name.to_string())
.send_shutdown(
db_name.to_string(),
trigger_name.to_string(),
&trigger.trigger,
)
.await?
else {
return Ok(());
Expand All @@ -548,10 +612,11 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
"shutdown trigger receiver dropped, may have received multiple shutdown requests"
);
} else {
self.plugin_event_tx
.write()
.await
.remove_trigger(db_name.to_string(), trigger_name.to_string());
self.plugin_event_tx.write().await.remove_trigger(
db_name.to_string(),
trigger_name.to_string(),
&trigger.trigger,
);
}

Ok(())
Expand Down Expand Up @@ -738,9 +803,16 @@ impl WalFileNotifier for ProcessingEngineManagerImpl {
}
}

#[allow(unused)]
pub(crate) enum PluginEvent {
pub(crate) enum WalEvent {
WriteWalContents(Arc<WalContents>),
Shutdown(oneshot::Sender<()>),
}

pub(crate) enum ScheduleEvent {
Shutdown(oneshot::Sender<()>),
}

pub(crate) enum RequestEvent {
Request(Request),
Shutdown(oneshot::Sender<()>),
}
Expand Down
Loading

0 comments on commit 7c38b8b

Please sign in to comment.