Skip to content

Commit

Permalink
feat(engine): implement NoopPubSub backend and refactor CloudPubSub i…
Browse files Browse the repository at this point in the history
…ntegration (#690)
  • Loading branch information
miseyu authored Dec 12, 2024
1 parent fffa0a5 commit 2fa9aeb
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 40 deletions.
20 changes: 17 additions & 3 deletions engine/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,26 @@ graphs:
#### `workerAssetPath`
* The path to the assets local directory.
``` yaml
env.get("workerAssetPath")
```

#### `workerArtifactPath`
* The path to the artifacts local directory.
``` yaml
env.get("workerArtifactPath")
```

### PubSub
#### Topics
* environment variables: FLOW_WORKER_EDGE_PASS_THROUGH_EVENT_TOPIC (default: flow-edge-pass-through-topic)
* environment variables: FLOW_WORKER_LOG_STREAM_TOPIC (default: flow-log-stream-topic)
* environment variables: FLOW_WORKER_JOB_COMPLETE_TOPIC (default: flow-job-complete-topic)
* flow-edge-pass-through-topic
* flow-log-stream-topic
* flow-job-complete-topic

### Runtime Environment Variables
| Name | Description | Default |
| ----------------------------------------- | ---------------------------------------------------------------------------- | ---------------------------- |
| FLOW_WORKER_EDGE_PASS_THROUGH_EVENT_TOPIC | PubSub topic name for the event that occurs when the Feature passes the edge | flow-edge-pass-through-topic |
| FLOW_WORKER_LOG_STREAM_TOPIC | Topic name of the event that occurs when the log comes into the log stream | flow-log-stream-topic |
| FLOW_WORKER_JOB_COMPLETE_TOPIC | Topic name of the event that will occur when the job is completed | flow-job-complete-topic |
| FLOW_WORKER_ENABLE_JSON_LOG | Enable log format to JSON format | false |
77 changes: 43 additions & 34 deletions engine/worker/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{collections::HashMap, io, str::FromStr, sync::Arc};

use clap::{Arg, ArgAction, ArgMatches, Command};
use google_cloud_pubsub::client::{Client, ClientConfig};
use reearth_flow_action_log::factory::{create_root_logger, LoggerFactory};
use reearth_flow_common::{dir::setup_job_directory, uri::Uri};
use reearth_flow_runner::runner::AsyncRunner;
Expand All @@ -14,7 +13,7 @@ use crate::{
asset::download_asset,
event_handler::EventHandler,
factory::ALL_ACTION_FACTORIES,
pubsub::{publisher::Publisher, CloudPubSub},
pubsub::{backend::PubSubBackend, publisher::Publisher},
types::{
job_complete_event::{JobCompleteEvent, JobResult},
metadata::Metadata,
Expand All @@ -31,6 +30,7 @@ pub fn build_worker_command() -> Command {
.arg(workflow_arg())
.arg(asset_arg())
.arg(worker_num_arg())
.arg(pubsub_backend_arg())
.arg(vars_arg())
}

Expand Down Expand Up @@ -61,13 +61,23 @@ fn worker_num_arg() -> Arg {
.display_order(3)
}

fn pubsub_backend_arg() -> Arg {
Arg::new("pubsub_backend")
.long("pubsub-backend")
.help("PubSub backend")
.env("FLOW_WORKER_PUBSUB_BACKEND")
.required(false)
.default_value("google")
.display_order(4)
}

fn vars_arg() -> Arg {
Arg::new("var")
.long("var")
.help("Workflow variables")
.required(false)
.action(ArgAction::Append)
.display_order(4)
.display_order(5)
}

#[derive(Debug, Clone, Eq, PartialEq)]
Expand All @@ -76,6 +86,7 @@ pub struct RunWorkerCommand {
metadata_path: Uri,
vars: HashMap<String, String>,
worker_num: usize,
pubsub_backend: String,
}

impl RunWorkerCommand {
Expand All @@ -89,6 +100,9 @@ impl RunWorkerCommand {
let metadata_path =
Uri::from_str(metadata_path.as_str()).map_err(crate::errors::Error::init)?;
let worker_num = matches.remove_one::<usize>("worker_num").unwrap_or(100);
let pubsub_backend = matches
.remove_one::<String>("pubsub_backend")
.unwrap_or_else(|| "google".to_string());
let vars = matches.remove_many::<String>("var");
let vars = if let Some(vars) = vars {
vars.into_iter()
Expand All @@ -109,6 +123,7 @@ impl RunWorkerCommand {
metadata_path,
vars,
worker_num,
pubsub_backend,
})
}

Expand All @@ -123,10 +138,21 @@ impl RunWorkerCommand {

async fn run(&self) -> crate::errors::Result<()> {
let storage_resolver = Arc::new(resolve::StorageResolver::new());
let (workflow, state, logger_factory, event_handler, meta) =
self.prepare(&storage_resolver).await?;
let (workflow, state, logger_factory, meta) = self.prepare(&storage_resolver).await?;

let pubsub = PubSubBackend::try_from(self.pubsub_backend.as_str())
.await
.map_err(crate::errors::Error::init)?;

let handler: Arc<dyn reearth_flow_runtime::event::EventHandler> = match pubsub {
PubSubBackend::Google(pubsub) => {
Arc::new(EventHandler::new(workflow.id, meta.job_id, pubsub))
}
PubSubBackend::Noop(pubsub) => {
Arc::new(EventHandler::new(workflow.id, meta.job_id, pubsub))
}
};
let workflow_id = workflow.id;
let handler: Arc<dyn reearth_flow_runtime::event::EventHandler> = Arc::new(event_handler);
let result = AsyncRunner::run_with_event_handler(
meta.job_id,
workflow,
Expand All @@ -137,37 +163,29 @@ impl RunWorkerCommand {
vec![handler],
)
.await;
let config = ClientConfig::default()
.with_auth()
.await
.map_err(crate::errors::Error::init)?;
let client = Client::new(config)
.await
.map_err(crate::errors::Error::init)?;
let pubsub = CloudPubSub::new(client);
let job_result = match result {
Ok(_) => {
self.cleanup(&meta, &storage_resolver).await?;
JobResult::Success
}
Err(_) => JobResult::Failed,
};
pubsub
.publish(JobCompleteEvent::new(workflow_id, meta.job_id, job_result))
let pubsub = PubSubBackend::try_from(self.pubsub_backend.as_str())
.await
.map_err(crate::errors::Error::run)
.map_err(crate::errors::Error::init)?;
match pubsub {
PubSubBackend::Google(pubsub) => pubsub
.publish(JobCompleteEvent::new(workflow_id, meta.job_id, job_result))
.await
.map_err(crate::errors::Error::run),
PubSubBackend::Noop(_) => Ok(()),
}
}

async fn prepare(
&self,
storage_resolver: &Arc<StorageResolver>,
) -> crate::errors::Result<(
Workflow,
Arc<State>,
Arc<LoggerFactory>,
EventHandler<CloudPubSub>,
Metadata,
)> {
) -> crate::errors::Result<(Workflow, Arc<State>, Arc<LoggerFactory>, Metadata)> {
let json = if self.workflow == "-" {
io::read_to_string(io::stdin()).map_err(crate::errors::Error::init)?
} else {
Expand Down Expand Up @@ -241,16 +259,7 @@ impl RunWorkerCommand {
create_root_logger(action_log_uri.path()),
action_log_uri.path(),
));
let config = ClientConfig::default()
.with_auth()
.await
.map_err(crate::errors::Error::init)?;
let client = Client::new(config)
.await
.map_err(crate::errors::Error::init)?;

let event_handler = EventHandler::new(workflow.id, job_id, CloudPubSub::new(client));
Ok((workflow, state, logger_factory, event_handler, meta))
Ok((workflow, state, logger_factory, meta))
}

async fn cleanup(
Expand Down
2 changes: 0 additions & 2 deletions engine/worker/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,3 @@ pub(crate) mod backend;
pub(crate) mod message;
pub(crate) mod publisher;
pub(crate) mod topic;

pub(crate) use backend::google_pubsub::CloudPubSub;
44 changes: 43 additions & 1 deletion engine/worker/src/pubsub/backend.rs
Original file line number Diff line number Diff line change
@@ -1 +1,43 @@
pub(crate) mod google_pubsub;
use google_cloud_pubsub::client::{Client, ClientConfig};

pub(crate) mod google;
pub(crate) mod noop;

#[derive(thiserror::Error, Debug)]
pub(crate) enum PubSubBackendError {
#[error("Failed to try from : {0}")]
TryFrom(String),
#[error("Failed to create : {0}")]
Create(String),
}

impl PubSubBackendError {
pub(crate) fn create<T: ToString>(message: T) -> Self {
Self::Create(message.to_string())
}
}

pub(crate) enum PubSubBackend {
Google(google::CloudPubSub),
Noop(noop::NoopPubSub),
}

impl PubSubBackend {
pub(crate) async fn try_from(value: &str) -> Result<Self, PubSubBackendError> {
let value = value.to_lowercase();
match value.as_str() {
"google" => {
let config = ClientConfig::default()
.with_auth()
.await
.map_err(PubSubBackendError::create)?;
let client = Client::new(config)
.await
.map_err(PubSubBackendError::create)?;
Ok(Self::Google(google::CloudPubSub::new(client)))
}
"noop" => Ok(Self::Noop(noop::NoopPubSub {})),
_ => Err(PubSubBackendError::TryFrom(value)),
}
}
}
File renamed without changes.
17 changes: 17 additions & 0 deletions engine/worker/src/pubsub/backend/noop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::pubsub::message::EncodableMessage;

#[derive(thiserror::Error, Debug)]
pub enum NoopPubSubError {}

pub struct NoopPubSub {}

#[async_trait::async_trait]
impl crate::pubsub::publisher::Publisher for NoopPubSub {
type Error = NoopPubSubError;

async fn publish<M: EncodableMessage>(&self, _message: M) -> Result<(), Self::Error> {
Ok(())
}

async fn shutdown(&self) {}
}

0 comments on commit 2fa9aeb

Please sign in to comment.