From 32733ec7e40d94b8868cbfc5c603982542be7518 Mon Sep 17 00:00:00 2001 From: Bittrance Date: Sun, 1 Oct 2023 20:30:09 +0200 Subject: [PATCH] ActionOutput -> WorkloadEvent and Workload::work() -> perform(). --- src/actions.rs | 20 +++++++++++------- src/opts.rs | 5 +---- src/receiver.rs | 24 +++++++++++---------- src/task/github.rs | 16 ++++++++------ src/task/gixworkload.rs | 22 +++++++++---------- src/task/mod.rs | 2 +- src/task/scheduled.rs | 2 +- src/testutils.rs | 2 +- tests/workload.rs | 47 +++++++++++++++++++++++------------------ 9 files changed, 76 insertions(+), 64 deletions(-) diff --git a/src/actions.rs b/src/actions.rs index 0a5e627..de375fa 100644 --- a/src/actions.rs +++ b/src/actions.rs @@ -13,7 +13,7 @@ use serde::Deserialize; use crate::{ errors::GitOpsError, opts::CliOptions, - receiver::{ActionOutput, SourceType}, + receiver::{SourceType, WorkloadEvent}, }; #[derive(Debug, PartialEq)] @@ -86,7 +86,7 @@ fn emit_data( ) -> JoinHandle> where R: Read + Send + 'static, - F: Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static, + F: Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static, { let sink = Arc::clone(sink); spawn(move || { @@ -96,7 +96,7 @@ where if len == 0 { break; } - sink.lock().unwrap()(ActionOutput::Output( + sink.lock().unwrap()(WorkloadEvent::ActionOutput( name.clone(), source_type, buf[..len].into(), @@ -114,7 +114,7 @@ pub fn run_action( sink: &Arc>, ) -> Result where - F: Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static, + F: Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static, { let mut command = build_command(action, cwd); let mut child = command.spawn().map_err(GitOpsError::ActionError)?; @@ -125,7 +125,7 @@ where emit_data(name.to_string(), stderr, SourceType::StdErr, sink); loop { if let Some(exit) = child.try_wait().map_err(GitOpsError::ActionError)? { - sink.lock().unwrap()(ActionOutput::Exit(name.to_string(), exit))?; + sink.lock().unwrap()(WorkloadEvent::ActionExit(name.to_string(), exit))?; if exit.success() { break Ok(ActionResult::Success); } else { @@ -134,7 +134,7 @@ where } if Instant::now() > deadline { child.kill().map_err(GitOpsError::ActionError)?; - sink.lock().unwrap()(ActionOutput::Timeout(name.to_string()))?; + sink.lock().unwrap()(WorkloadEvent::Timeout(name.to_string()))?; break Ok(ActionResult::Failure); } sleep(Duration::from_secs(1)); @@ -179,8 +179,12 @@ mod tests { assert!(matches!(res, Ok(ActionResult::Success))); assert_eq!( vec![ - ActionOutput::Output("test".to_owned(), SourceType::StdOut, b"test\n".to_vec()), - ActionOutput::Exit("test".to_owned(), ExitStatus::from_raw(0)), + WorkloadEvent::ActionOutput( + "test".to_owned(), + SourceType::StdOut, + b"test\n".to_vec() + ), + WorkloadEvent::ActionExit("test".to_owned(), ExitStatus::from_raw(0)), ], events.lock().unwrap()[..] ); diff --git a/src/opts.rs b/src/opts.rs index 9e9bbad..9b6ba7b 100644 --- a/src/opts.rs +++ b/src/opts.rs @@ -8,10 +8,7 @@ use crate::{ receiver::logging_receiver, store::{FileStore, Store}, task::{ - github::github_watcher, - gixworkload::GitWorkload, - scheduled::ScheduledTask, - GitTaskConfig, + github::github_watcher, gixworkload::GitWorkload, scheduled::ScheduledTask, GitTaskConfig, }, }; diff --git a/src/receiver.rs b/src/receiver.rs index 28e39f5..f2e9823 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -9,42 +9,44 @@ pub enum SourceType { } #[derive(Clone, Debug, PartialEq)] -pub enum ActionOutput { +pub enum WorkloadEvent { // TODO Name types would be nice Changes(String, ObjectId, ObjectId), - Output(String, SourceType, Vec), - Exit(String, ExitStatus), + ActionOutput(String, SourceType, Vec), + ActionExit(String, ExitStatus), Success(String, ObjectId), Failure(String, String, ObjectId), Error(String, String, ObjectId), Timeout(String), } -pub fn logging_receiver(events: &Receiver) { +pub fn logging_receiver(events: &Receiver) { while let Ok(event) = events.recv() { match event { - ActionOutput::Changes(name, prev_sha, new_sha) => { + WorkloadEvent::Changes(name, prev_sha, new_sha) => { if prev_sha == ObjectId::null(Kind::Sha1) { println!("{}: New repo @ {}", name, new_sha); } else { println!("{}: Updated repo {} -> {}", name, prev_sha, new_sha); } } - ActionOutput::Output(name, source_type, data) => match source_type { + WorkloadEvent::ActionOutput(name, source_type, data) => match source_type { SourceType::StdOut => println!("{}: {}", name, String::from_utf8_lossy(&data)), SourceType::StdErr => eprintln!("{}: {}", name, String::from_utf8_lossy(&data)), }, - ActionOutput::Exit(name, exit) => println!("{}: exited with code {}", name, exit), - ActionOutput::Success(name, new_sha) => { + WorkloadEvent::ActionExit(name, exit) => { + println!("{}: exited with code {}", name, exit) + } + WorkloadEvent::Success(name, new_sha) => { println!("{}: actions successful for {}", name, new_sha) } - ActionOutput::Failure(task, action, new_sha) => { + WorkloadEvent::Failure(task, action, new_sha) => { println!("{}: action {} failed for {}", task, action, new_sha) } - ActionOutput::Error(name, error, new_sha) => { + WorkloadEvent::Error(name, error, new_sha) => { println!("{}: error running actions for {}: {}", name, new_sha, error) } - ActionOutput::Timeout(name) => println!("{}: took too long", name), + WorkloadEvent::Timeout(name) => println!("{}: took too long", name), } } } diff --git a/src/task/github.rs b/src/task/github.rs index e4f4709..797abd4 100644 --- a/src/task/github.rs +++ b/src/task/github.rs @@ -9,7 +9,7 @@ use reqwest::{ use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::{errors::GitOpsError, opts::CliOptions, receiver::ActionOutput}; +use crate::{errors::GitOpsError, opts::CliOptions, receiver::WorkloadEvent}; #[derive(Clone, Deserialize)] pub struct GitHubNotifyConfig { @@ -182,10 +182,12 @@ pub fn update_commit_status( } } -pub fn github_watcher(notify_config: GitHubNotifyConfig) -> impl Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static { +pub fn github_watcher( + notify_config: GitHubNotifyConfig, +) -> impl Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static { move |event| { match event { - ActionOutput::Changes(name, prev_sha, new_sha) => { + WorkloadEvent::Changes(name, prev_sha, new_sha) => { update_commit_status( ¬ify_config, &new_sha, @@ -193,7 +195,7 @@ pub fn github_watcher(notify_config: GitHubNotifyConfig) -> impl Fn(ActionOutput &format!("running {} [last success {}]", name, prev_sha), )?; } - ActionOutput::Success(name, new_sha) => { + WorkloadEvent::Success(name, new_sha) => { update_commit_status( ¬ify_config, &new_sha, @@ -201,7 +203,7 @@ pub fn github_watcher(notify_config: GitHubNotifyConfig) -> impl Fn(ActionOutput &format!("{} succeeded", name), )?; } - ActionOutput::Failure(task, action, new_sha) => { + WorkloadEvent::Failure(task, action, new_sha) => { update_commit_status( ¬ify_config, &new_sha, @@ -209,7 +211,7 @@ pub fn github_watcher(notify_config: GitHubNotifyConfig) -> impl Fn(ActionOutput &format!("{} failed on action {}", task, action), )?; } - ActionOutput::Error(task, action, new_sha) => { + WorkloadEvent::Error(task, action, new_sha) => { update_commit_status( ¬ify_config, &new_sha, @@ -221,4 +223,4 @@ pub fn github_watcher(notify_config: GitHubNotifyConfig) -> impl Fn(ActionOutput }; Ok(()) } -} \ No newline at end of file +} diff --git a/src/task/gixworkload.rs b/src/task/gixworkload.rs index 3617431..e8b798a 100644 --- a/src/task/gixworkload.rs +++ b/src/task/gixworkload.rs @@ -1,5 +1,5 @@ use std::{ - path::{PathBuf, Path}, + path::{Path, PathBuf}, sync::{Arc, Mutex}, time::{Duration, Instant}, }; @@ -11,7 +11,7 @@ use crate::{ errors::GitOpsError, git::ensure_worktree, opts::CliOptions, - receiver::ActionOutput, + receiver::WorkloadEvent, }; use super::{GitTaskConfig, Workload}; @@ -22,7 +22,7 @@ pub struct GitWorkload { config: GitTaskConfig, repo_dir: PathBuf, watchers: - Vec Result<(), GitOpsError> + Send + 'static>>>>, + Vec Result<(), GitOpsError> + Send + 'static>>>>, } impl GitWorkload { @@ -41,7 +41,7 @@ impl GitWorkload { pub fn watch( &mut self, - watcher: impl Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static, + watcher: impl Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static, ) { self.watchers.push(Arc::new(Mutex::new(Box::new(watcher)))); } @@ -50,7 +50,7 @@ impl GitWorkload { &self, workdir: &Path, deadline: Instant, - sink: &Arc Result<(), GitOpsError> + Send + 'static>>, + sink: &Arc Result<(), GitOpsError> + Send + 'static>>, ) -> Result, GitOpsError> { for action in &self.config.actions { let name = format!("{}|{}", self.config.name, action.id()); @@ -72,10 +72,10 @@ impl Workload for GitWorkload { self.config.interval } - fn work(&self, workdir: PathBuf, current_sha: ObjectId) -> Result { + fn perform(&self, workdir: PathBuf, current_sha: ObjectId) -> Result { let deadline = Instant::now() + self.config.timeout; let watchers = self.watchers.clone(); - let sink = Arc::new(Mutex::new(move |event: ActionOutput| { + let sink = Arc::new(Mutex::new(move |event: WorkloadEvent| { for watcher in &watchers { watcher.lock().unwrap()(event.clone())?; } @@ -84,7 +84,7 @@ impl Workload for GitWorkload { let new_sha = ensure_worktree(&self.config.git, deadline, &self.repo_dir, &workdir)?; if current_sha != new_sha { - sink.lock().unwrap()(ActionOutput::Changes( + sink.lock().unwrap()(WorkloadEvent::Changes( self.config.name.clone(), current_sha, new_sha, @@ -92,11 +92,11 @@ impl Workload for GitWorkload { .map_err(|err| GitOpsError::NotifyError(format!("{}", err)))?; match self.run_actions(&workdir, deadline, &sink) { Ok(None) => { - sink.lock().unwrap()(ActionOutput::Success(self.config.name.clone(), new_sha)) + sink.lock().unwrap()(WorkloadEvent::Success(self.config.name.clone(), new_sha)) .map_err(|err| GitOpsError::NotifyError(format!("{}", err)))? } Ok(Some(action_name)) => { - sink.lock().unwrap()(ActionOutput::Failure( + sink.lock().unwrap()(WorkloadEvent::Failure( self.config.name.clone(), action_name, new_sha, @@ -104,7 +104,7 @@ impl Workload for GitWorkload { .map_err(|err| GitOpsError::NotifyError(format!("{}", err)))?; } Err(err) => { - sink.lock().unwrap()(ActionOutput::Error( + sink.lock().unwrap()(WorkloadEvent::Error( self.config.name.clone(), format!("{}", err), new_sha, diff --git a/src/task/mod.rs b/src/task/mod.rs index cbf5835..babe6f0 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -15,7 +15,7 @@ pub mod scheduled; pub trait Workload { fn id(&self) -> String; fn interval(&self) -> Duration; - fn work(&self, workdir: PathBuf, current_sha: ObjectId) -> Result; + fn perform(&self, workdir: PathBuf, current_sha: ObjectId) -> Result; } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/task/scheduled.rs b/src/task/scheduled.rs index 0ca4c12..3ed6030 100644 --- a/src/task/scheduled.rs +++ b/src/task/scheduled.rs @@ -51,7 +51,7 @@ impl ScheduledTask { .map_err(GitOpsError::WorkDir)? .into_path(); let work = self.work.clone(); - self.worker = Some(spawn(move || work.work(workdir, current_sha))); + self.worker = Some(spawn(move || work.perform(workdir, current_sha))); Ok(()) } diff --git a/src/testutils.rs b/src/testutils.rs index 973cd47..9dad2c7 100644 --- a/src/testutils.rs +++ b/src/testutils.rs @@ -40,7 +40,7 @@ impl Workload for TestWorkload { Duration::from_secs(1) } - fn work(&self, _workdir: PathBuf, _current_sha: ObjectId) -> Result { + fn perform(&self, _workdir: PathBuf, _current_sha: ObjectId) -> Result { self.status .store(true, std::sync::atomic::Ordering::Relaxed); sleep(Duration::from_millis(10)); diff --git a/tests/workload.rs b/tests/workload.rs index a776911..3f4223c 100644 --- a/tests/workload.rs +++ b/tests/workload.rs @@ -3,25 +3,22 @@ use std::sync::{Arc, Mutex}; use clap::Parser; use gix::{hash::Kind, ObjectId}; use kitops::{ + errors::GitOpsError, opts::CliOptions, - receiver::ActionOutput, - task::{gixworkload::GitWorkload, GitTaskConfig, Workload}, errors::GitOpsError, + receiver::WorkloadEvent, + task::{gixworkload::GitWorkload, GitTaskConfig, Workload}, }; use utils::*; mod utils; fn cli_options(repodir: &tempfile::TempDir) -> CliOptions { - CliOptions::parse_from(&[ - "kitops", - "--repo-dir", - &repodir.path().to_str().unwrap(), - ]) + CliOptions::parse_from(&["kitops", "--repo-dir", &repodir.path().to_str().unwrap()]) } fn config(upstream: &tempfile::TempDir, entrypoint: &str) -> GitTaskConfig { serde_yaml::from_str(&format!( - r#" + r#" name: ze-task git: url: file://{} @@ -29,20 +26,28 @@ actions: - name: ze-action entrypoint: {} "#, - upstream.path().to_str().unwrap(), entrypoint)) + upstream.path().to_str().unwrap(), + entrypoint + )) .unwrap() } -fn non_action_events(events: Arc>>) -> Vec { +fn non_action_events(events: Arc>>) -> Vec { events .lock() .unwrap() .iter() - .filter(|e| !matches!(e, ActionOutput::Output(..) | ActionOutput::Exit(..))) + .filter(|e| { + !matches!( + e, + WorkloadEvent::ActionOutput(..) | WorkloadEvent::ActionExit(..) + ) + }) .cloned() .collect::>() } +#[cfg(unix)] #[test] fn watch_successful_workload() { let sh = shell(); @@ -61,16 +66,17 @@ fn watch_successful_workload() { Ok(()) }); let prev_sha = ObjectId::empty_tree(Kind::Sha1); - workload.work(workdir.into_path(), prev_sha).unwrap(); + workload.perform(workdir.into_path(), prev_sha).unwrap(); assert_eq!( non_action_events(events), vec![ - ActionOutput::Changes("ze-task".to_string(), prev_sha.clone(), next_sha), - ActionOutput::Success("ze-task".to_string(), next_sha), + WorkloadEvent::Changes("ze-task".to_string(), prev_sha.clone(), next_sha), + WorkloadEvent::Success("ze-task".to_string(), next_sha), ] ); } +#[cfg(unix)] #[test] fn watch_failing_workload() { let sh = shell(); @@ -88,13 +94,14 @@ fn watch_failing_workload() { Ok(()) }); let prev_sha = ObjectId::empty_tree(Kind::Sha1); - workload.work(workdir.into_path(), prev_sha).unwrap(); + workload.perform(workdir.into_path(), prev_sha).unwrap(); let events = non_action_events(events); assert_eq!(events.len(), 2); - assert!(matches!(events[0], ActionOutput::Changes(..))); - assert!(matches!(events[1], ActionOutput::Failure(..))); + assert!(matches!(events[0], WorkloadEvent::Changes(..))); + assert!(matches!(events[1], WorkloadEvent::Failure(..))); } +#[cfg(unix)] #[test] fn watch_erroring_workload() { let sh = shell(); @@ -112,10 +119,10 @@ fn watch_erroring_workload() { Ok(()) }); let prev_sha = ObjectId::empty_tree(Kind::Sha1); - let res = workload.work(workdir.into_path(), prev_sha); + let res = workload.perform(workdir.into_path(), prev_sha); assert!(matches!(res, Err(GitOpsError::ActionError(..)))); let events = non_action_events(events); assert_eq!(events.len(), 2); - assert!(matches!(events[0], ActionOutput::Changes(..))); - assert!(matches!(events[1], ActionOutput::Error(..))); + assert!(matches!(events[0], WorkloadEvent::Changes(..))); + assert!(matches!(events[1], WorkloadEvent::Error(..))); }