Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: each plugin type uses its own channel #25908

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 159 additions & 87 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ pub struct ProcessingEngineManagerImpl {

#[derive(Debug, Default)]
struct PluginChannels {
/// Map of database to trigger name to sender
active_triggers: HashMap<String, HashMap<String, mpsc::Sender<PluginEvent>>>,
/// Map of request path to the sender
request_triggers: HashMap<String, mpsc::Sender<PluginEvent>>,
/// Map of database to wal trigger name to handler
wal_triggers: HashMap<String, HashMap<String, mpsc::Sender<WalEvent>>>,
/// Map of database to schedule trigger name to handler
schedule_triggers: HashMap<String, HashMap<String, mpsc::Sender<ScheduleEvent>>>,
/// Map of request path to the request trigger handler
request_triggers: HashMap<String, mpsc::Sender<RequestEvent>>,
}

#[cfg(feature = "system-py")]
Expand All @@ -60,77 +62,115 @@ impl PluginChannels {
&self,
db: String,
trigger: String,
trigger_spec: &TriggerSpecificationDefinition,
) -> Result<Option<Receiver<()>>, ProcessingEngineError> {
if let Some(trigger_map) = self.active_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(PluginEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
match trigger_spec {
TriggerSpecificationDefinition::SingleTableWalWrite { .. }
| TriggerSpecificationDefinition::AllTablesWalWrite => {
if let Some(trigger_map) = self.wal_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(WalEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
}
}
TriggerSpecificationDefinition::Schedule { .. }
| TriggerSpecificationDefinition::Every { .. } => {
if let Some(trigger_map) = self.schedule_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(ScheduleEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
}
}
TriggerSpecificationDefinition::RequestPath { .. } => {
if let Some(sender) = self.request_triggers.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(RequestEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
return Ok(Some(rx));
}
}
Ok(None)
}

fn remove_trigger(&mut self, db: String, trigger: String) {
if let Some(trigger_map) = self.active_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
}
Ok(None)
}

#[cfg(feature = "system-py")]
fn add_trigger(
fn remove_trigger(
&mut self,
trigger_spec: &TriggerSpecificationDefinition,
db: String,
trigger: String,
) -> mpsc::Receiver<PluginEvent> {
observability_deps::tracing::info!(%db, ?trigger, ?trigger_spec, "adding trigger to plugin event channel");
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);

trigger_spec: &TriggerSpecificationDefinition,
) {
match trigger_spec {
TriggerSpecificationDefinition::SingleTableWalWrite { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::AllTablesWalWrite => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::Schedule { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
TriggerSpecificationDefinition::SingleTableWalWrite { .. }
| TriggerSpecificationDefinition::AllTablesWalWrite => {
if let Some(trigger_map) = self.wal_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
}
}
TriggerSpecificationDefinition::Every { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
TriggerSpecificationDefinition::Schedule { .. }
| TriggerSpecificationDefinition::Every { .. } => {
if let Some(trigger_map) = self.schedule_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
}
}
TriggerSpecificationDefinition::RequestPath { path } => {
self.request_triggers.insert(path.to_string(), tx);
TriggerSpecificationDefinition::RequestPath { .. } => {
self.request_triggers.remove(&trigger);
}
}
}

#[cfg(feature = "system-py")]
fn add_wal_trigger(&mut self, db: String, trigger: String) -> mpsc::Receiver<WalEvent> {
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);
self.wal_triggers.entry(db).or_default().insert(trigger, tx);
rx
}

fn add_schedule_trigger(
&mut self,
db: String,
trigger: String,
) -> mpsc::Receiver<ScheduleEvent> {
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);
self.schedule_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
rx
}

fn add_request_trigger(&mut self, path: String) -> mpsc::Receiver<RequestEvent> {
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);
self.request_triggers.insert(path, tx);
rx
}

async fn send_wal_contents(&self, wal_contents: Arc<WalContents>) {
for (db, trigger_map) in &self.active_triggers {
for (db, trigger_map) in &self.wal_triggers {
for (trigger, sender) in trigger_map {
if let Err(e) = sender
.send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents)))
.send(WalEvent::WriteWalContents(Arc::clone(&wal_contents)))
.await
{
warn!(%e, %db, ?trigger, "error sending wal contents to plugin");
Expand All @@ -144,7 +184,7 @@ impl PluginChannels {
trigger_path: &str,
request: Request,
) -> Result<(), ProcessingEngineError> {
let event = PluginEvent::Request(request);
let event = RequestEvent::Request(request);
if let Some(sender) = self.request_triggers.get(trigger_path) {
if sender.send(event).await.is_err() {
return Err(ProcessingEngineError::RequestTriggerNotFound);
Expand Down Expand Up @@ -450,14 +490,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
})?
.clone();

let trigger_rx = self.plugin_event_tx.write().await.add_trigger(
&trigger.trigger,
db_name.to_string(),
trigger_name.to_string(),
);

let plugin_context = PluginContext {
trigger_rx,
write_buffer,
query_executor,
};
Expand All @@ -473,25 +506,52 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
.into());
};
match plugin_definition.plugin_type {
PluginType::WalRows => plugins::run_wal_contents_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
),
PluginType::Scheduled => plugins::run_schedule_plugin(
db_name.to_string(),
plugin_code,
trigger,
Arc::clone(&self.time_provider),
plugin_context,
)?,
PluginType::Request => plugins::run_request_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
),
PluginType::WalRows => {
let rec = self
.plugin_event_tx
.write()
.await
.add_wal_trigger(db_name.to_string(), trigger_name.to_string());

plugins::run_wal_contents_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
rec,
)
}
PluginType::Scheduled => {
let rec = self
.plugin_event_tx
.write()
.await
.add_schedule_trigger(db_name.to_string(), trigger_name.to_string());

plugins::run_schedule_plugin(
db_name.to_string(),
plugin_code,
trigger,
Arc::clone(&self.time_provider),
plugin_context,
rec,
)?
}
PluginType::Request => {
let rec = self
.plugin_event_tx
.write()
.await
.add_request_trigger(trigger_name.to_string());

plugins::run_request_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
rec,
)
}
}
}

Expand Down Expand Up @@ -537,7 +597,11 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
.plugin_event_tx
.write()
.await
.send_shutdown(db_name.to_string(), trigger_name.to_string())
.send_shutdown(
db_name.to_string(),
trigger_name.to_string(),
&trigger.trigger,
)
.await?
else {
return Ok(());
Expand All @@ -548,10 +612,11 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
"shutdown trigger receiver dropped, may have received multiple shutdown requests"
);
} else {
self.plugin_event_tx
.write()
.await
.remove_trigger(db_name.to_string(), trigger_name.to_string());
self.plugin_event_tx.write().await.remove_trigger(
db_name.to_string(),
trigger_name.to_string(),
&trigger.trigger,
);
}

Ok(())
Expand Down Expand Up @@ -738,9 +803,16 @@ impl WalFileNotifier for ProcessingEngineManagerImpl {
}
}

#[allow(unused)]
pub(crate) enum PluginEvent {
pub(crate) enum WalEvent {
WriteWalContents(Arc<WalContents>),
Shutdown(oneshot::Sender<()>),
}

pub(crate) enum ScheduleEvent {
Shutdown(oneshot::Sender<()>),
}

pub(crate) enum RequestEvent {
Request(Request),
Shutdown(oneshot::Sender<()>),
}
Expand Down
Loading
Loading