diff --git a/src/lib.rs b/src/lib.rs index a15a8d5..6a2eef8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ use std::{sync::mpsc::Sender, thread::sleep, time::Duration}; +use task::{scheduled::ScheduledTask, Workload}; + pub mod actions; pub mod errors; pub mod git; @@ -7,6 +9,8 @@ pub mod opts; pub mod receiver; pub mod store; pub mod task; +#[cfg(test)] +pub(crate) mod testutils; pub(crate) mod utils; #[derive(Debug, PartialEq)] @@ -15,16 +19,16 @@ pub enum Progress { Idle, } -pub fn run_tasks( - tasks: &mut [T], +pub fn run_tasks( + tasks: &mut [ScheduledTask], mut persist: F, tx: &Sender, once_only: bool, poll_interval: Duration, ) -> Result<(), errors::GitOpsError> where - F: FnMut(&T) -> Result<(), errors::GitOpsError>, - T: task::Task, + F: FnMut(&ScheduledTask) -> Result<(), errors::GitOpsError>, + W: Workload + Clone + Send + 'static, { loop { let res = progress_one_task(tasks, &mut persist, tx)?; @@ -40,14 +44,14 @@ where } } -fn progress_one_task( - tasks: &mut [T], +fn progress_one_task( + tasks: &mut [ScheduledTask], persist: &mut F, tx: &Sender, ) -> Result where - F: FnMut(&T) -> Result<(), errors::GitOpsError>, - T: task::Task, + F: FnMut(&ScheduledTask) -> Result<(), errors::GitOpsError>, + W: Workload + Clone + Send + 'static, { if let Some(task) = tasks.iter_mut().find(|t| t.is_eligible()) { let task_tx = tx.clone(); @@ -70,101 +74,38 @@ where #[cfg(test)] mod lib { - use std::cell::RefCell; - use std::sync::mpsc::Sender; - - use crate::errors::GitOpsError; - use crate::receiver::ActionOutput; - use crate::task::{State, Task}; - - #[derive(Default)] - struct TestTask { - pub status: RefCell>, - pub eligible: RefCell, - } - - impl TestTask { - pub fn new() -> Self { - Default::default() - } - - pub fn make_eligible(&self) { - *self.eligible.borrow_mut() = true; - } + use std::{thread::sleep, time::Duration}; - pub fn run(&self) { - *self.eligible.borrow_mut() = false; - *self.status.borrow_mut() = Some(true); - } - - pub fn complete(&self) { - let mut v = self.status.borrow_mut(); - if v.is_some() { - *v = Some(false); - } - } - } - - impl Task for TestTask { - fn is_eligible(&self) -> bool { - self.status.borrow().is_none() && *self.eligible.borrow() - } - - fn is_running(&self) -> bool { - self.status.borrow().is_some_and(|s| s) - } - - fn is_finished(&self) -> bool { - self.status.borrow().is_some_and(|s| !s) - } - - fn schedule_next(&mut self) {} - - fn start(&mut self, _: Sender) -> Result<(), GitOpsError> { - assert!(*self.eligible.borrow()); - assert!(self.status.borrow().is_none()); - self.run(); - Ok(()) - } - - fn finalize(&mut self) -> Result<(), GitOpsError> { - assert!(!*self.eligible.borrow()); - assert!(self.status.borrow().is_some()); - *self.status.borrow_mut() = None; - Ok(()) - } - - fn state(&self) -> State { - todo!("Not needed") - } - - fn set_state(&mut self, _: State) { - todo!("Not needed") - } - } + use crate::{task::{scheduled::ScheduledTask, State}, testutils::TestWorkload}; #[test] - fn dont_start_ineligible_task() { - let mut tasks = vec![TestTask::new()]; + fn run_eligible_task() { + let mut tasks = vec![ScheduledTask::new(TestWorkload::default())]; let (tx, _) = std::sync::mpsc::channel(); - let mut persist = |_t: &TestTask| Ok(()); + let mut persist = |_t: &ScheduledTask| Ok(()); + let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap(); + assert!(progress == super::Progress::Running); + assert!(tasks[0].is_running()); + while !tasks[0].is_finished() { + sleep(Duration::from_millis(2)); + } + let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap(); + assert!(progress == super::Progress::Running); + assert!(!tasks[0].is_finished()); let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap(); assert!(progress == super::Progress::Idle); - assert!(!tasks[0].is_running()); } #[test] - fn run_eligible_task() { - let mut tasks = vec![TestTask::new()]; + fn dont_start_ineligible_task() { + let mut tasks = vec![ScheduledTask::new(TestWorkload::default())]; + tasks[0].set_state(State { + next_run: std::time::SystemTime::now() + Duration::from_secs(1), + current_sha: gix::ObjectId::empty_blob(gix::hash::Kind::Sha1), + }); let (tx, _) = std::sync::mpsc::channel(); - let mut persist = |_t: &TestTask| Ok(()); - tasks[0].make_eligible(); + let mut persist = |_t: &ScheduledTask| Ok(()); let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap(); - assert!(progress == super::Progress::Running); - assert!(tasks[0].is_running()); - tasks[0].complete(); - let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap(); - assert!(progress == super::Progress::Running); - assert!(!tasks[0].is_eligible()); + assert!(progress == super::Progress::Idle); } } diff --git a/src/main.rs b/src/main.rs index 389cbc8..4f9d8f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,8 @@ use kitops::opts::{load_store, load_tasks, CliOptions}; use kitops::receiver::logging_receiver; use kitops::run_tasks; use kitops::store::Store; -use kitops::task::{GitTask, Task}; +use kitops::task::gixworkload::GitWorkload; +use kitops::task::scheduled::ScheduledTask; use std::time::Duration; use std::{collections::HashSet, sync::mpsc::channel, thread::spawn}; @@ -20,7 +21,7 @@ fn main() -> Result<(), GitOpsError> { }); let mut tasks = load_tasks(&opts)?; let mut store = load_store(&opts)?; - let task_ids = tasks.iter().map(GitTask::id).collect::>(); + let task_ids = tasks.iter().map(ScheduledTask::id).collect::>(); store.retain(task_ids); for task in &mut tasks { if let Some(s) = store.get(&task.id()) { @@ -29,7 +30,7 @@ fn main() -> Result<(), GitOpsError> { } run_tasks( &mut tasks[..], - |t: &GitTask| store.persist(t.id(), t), + |t: &ScheduledTask| store.persist(t.id(), t), &tx, opts.once_only, Duration::from_secs(1), diff --git a/src/opts.rs b/src/opts.rs index 7102cdf..98c73b5 100644 --- a/src/opts.rs +++ b/src/opts.rs @@ -7,7 +7,7 @@ use crate::{ actions::Action, errors::GitOpsError, store::{FileStore, Store}, - task::{GitTask, GitTaskConfig}, + task::{gixworkload::GitWorkload, scheduled::ScheduledTask, GitTaskConfig}, }; const DEFAULT_BRANCH: &str = "main"; @@ -94,7 +94,7 @@ struct ConfigFile { tasks: Vec, } -fn tasks_from_file(opts: &CliOptions) -> Result, GitOpsError> { +fn tasks_from_file(opts: &CliOptions) -> Result>, GitOpsError> { let config = File::open(opts.config_file.clone().unwrap()).map_err(GitOpsError::MissingConfig)?; let config_file: ConfigFile = @@ -102,18 +102,20 @@ fn tasks_from_file(opts: &CliOptions) -> Result, GitOpsError> { Ok(config_file .tasks .into_iter() - .map(|c| GitTask::from_config(c, opts)) + .map(|c| ScheduledTask::new(GitWorkload::from_config(c, opts))) .collect()) } -fn tasks_from_opts(opts: &CliOptions) -> Result, GitOpsError> { +fn tasks_from_opts(opts: &CliOptions) -> Result>, GitOpsError> { let mut config: GitTaskConfig = TryFrom::try_from(opts)?; let action: Action = TryFrom::try_from(opts)?; config.add_action(action); - Ok(vec![GitTask::from_config(config, opts)]) + let work = GitWorkload::from_config(config, opts); + let task = ScheduledTask::new(work); + Ok(vec![task]) } -pub fn load_tasks(opts: &CliOptions) -> Result, GitOpsError> { +pub fn load_tasks(opts: &CliOptions) -> Result>, GitOpsError> { if opts.url.is_some() { tasks_from_opts(opts) } else { diff --git a/src/store.rs b/src/store.rs index da6e30b..1879c69 100644 --- a/src/store.rs +++ b/src/store.rs @@ -6,15 +6,17 @@ use std::{ use crate::{ errors::GitOpsError, - task::{State, Task}, + task::{scheduled::ScheduledTask, State, Workload}, }; pub trait Store { fn get(&self, id: &str) -> Option<&State>; fn retain(&mut self, task_ids: HashSet); - fn persist(&mut self, id: String, task: &T) -> Result<(), GitOpsError> - where - T: Task; + fn persist( + &mut self, + id: String, + task: &ScheduledTask, + ) -> Result<(), GitOpsError>; } #[derive(Debug, Default)] @@ -47,10 +49,11 @@ impl Store for FileStore { self.state.retain(|id, _| task_ids.contains(id)); } - fn persist(&mut self, id: String, task: &T) -> Result<(), GitOpsError> - where - T: Task, - { + fn persist( + &mut self, + id: String, + task: &ScheduledTask, + ) -> Result<(), GitOpsError> { self.state.insert(id, task.state()); let file = File::create(&self.path).map_err(GitOpsError::SavingState)?; serde_yaml::to_writer(file, &self.state).map_err(GitOpsError::SerdeState) diff --git a/src/task/github.rs b/src/task/github.rs index c5fc2e3..f6b36df 100644 --- a/src/task/github.rs +++ b/src/task/github.rs @@ -75,7 +75,11 @@ fn generate_jwt(config: &GitHubNotifyConfig) -> Result { .map_err(GitOpsError::GitHubBadPrivateKey) } -fn get_installation_id(config: &GitHubNotifyConfig, client: &reqwest::blocking::Client, jwt_token: &String) -> Result { +fn get_installation_id( + config: &GitHubNotifyConfig, + client: &reqwest::blocking::Client, + jwt_token: &String, +) -> Result { // TODO Is this different if we are installed organization-wise? let url = format!( "https://api.github.com/repos/{}/installation", @@ -105,7 +109,11 @@ fn get_installation_id(config: &GitHubNotifyConfig, client: &reqwest::blocking:: Ok(installation_id) } -fn get_access_token(installation_id: u64, client: &reqwest::blocking::Client, jwt_token: &String) -> Result { +fn get_access_token( + installation_id: u64, + client: &reqwest::blocking::Client, + jwt_token: &String, +) -> Result { let url = format!( "https://api.github.com/app/installations/{}/access_tokens", installation_id diff --git a/src/task/gixworkload.rs b/src/task/gixworkload.rs new file mode 100644 index 0000000..68c095a --- /dev/null +++ b/src/task/gixworkload.rs @@ -0,0 +1,77 @@ +use std::{ + path::PathBuf, + time::{Duration, Instant}, +}; + +use gix::ObjectId; + +use crate::{ + actions::run_action, errors::GitOpsError, git::ensure_worktree, opts::CliOptions, + receiver::ActionOutput, +}; + +use super::{ + github::{update_commit_status, GitHubStatus}, + GitTaskConfig, Workload, +}; + +#[derive(Clone)] +pub struct GitWorkload { + config: GitTaskConfig, + repo_dir: PathBuf, +} + +impl GitWorkload { + pub fn from_config(config: GitTaskConfig, opts: &CliOptions) -> Self { + let repo_dir = opts + .repo_dir + .as_ref() + .map(|dir| dir.join(config.git.safe_url())) + .unwrap(); + GitWorkload { config, repo_dir } + } +} + +impl Workload for GitWorkload { + fn id(&self) -> String { + self.config.name.clone() + } + + fn interval(&self) -> Duration { + self.config.interval + } + + fn work( + &self, + workdir: PathBuf, + current_sha: ObjectId, + sink: F, + ) -> Result + where + F: Fn(ActionOutput) -> Result<(), GitOpsError> + Clone + Send + 'static, + { + let config = self.config.clone(); + let task_id = config.name.clone(); + let repodir = self.repo_dir.clone(); + let deadline = Instant::now() + config.timeout; + + let new_sha = ensure_worktree(&config.git, deadline, &repodir, &workdir)?; + if current_sha != new_sha { + sink(ActionOutput::Changes( + config.name.clone(), + current_sha, + new_sha, + )) + .map_err(|err| GitOpsError::SendError(format!("{}", err)))?; + for action in config.actions { + let name = format!("{}|{}", task_id, action.id()); + run_action(&name, &action, &workdir, deadline, &sink)?; + } + } + if let Some(cfg) = config.notify { + update_commit_status(&cfg, &new_sha.to_string(), GitHubStatus::Success, "Did it")?; + } + std::fs::remove_dir_all(&workdir).map_err(GitOpsError::WorkDir)?; + Ok(new_sha) + } +} diff --git a/src/task/mod.rs b/src/task/mod.rs index 3c57ab4..827e66c 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -1,143 +1,30 @@ use std::{ - ops::Add, path::PathBuf, - sync::mpsc::Sender, - thread::{spawn, JoinHandle}, - time::{Duration, Instant, SystemTime}, + time::{Duration, SystemTime}, }; use gix::{hash::Kind, ObjectId, Url}; use serde::{Deserialize, Deserializer, Serialize}; use crate::{ - actions::{run_action, Action}, - errors::GitOpsError, - git::{ensure_worktree, GitConfig}, - opts::CliOptions, - receiver::ActionOutput, + actions::Action, errors::GitOpsError, git::GitConfig, opts::CliOptions, receiver::ActionOutput, }; -use self::github::{update_commit_status, GitHubStatus}; - pub mod github; - -pub trait Task { - fn is_eligible(&self) -> bool; - fn is_running(&self) -> bool; - fn is_finished(&self) -> bool; - fn schedule_next(&mut self); - fn start(&mut self, tx: Sender) -> Result<(), GitOpsError>; - fn finalize(&mut self) -> Result<(), GitOpsError>; - fn state(&self) -> State; - fn set_state(&mut self, state: State); -} - -pub struct GitTask { - config: GitTaskConfig, - repo_dir: PathBuf, - pub state: State, - worker: Option>>, -} - -impl GitTask { - pub fn from_config(config: GitTaskConfig, opts: &CliOptions) -> Self { - let repo_dir = opts - .repo_dir - .as_ref() - .map(|dir| dir.join(config.git.safe_url())) - .unwrap(); - GitTask { - config, - repo_dir, - state: State::default(), - worker: None, - } - } - - pub fn id(&self) -> String { - self.config.name.clone() - } - - fn work(&self, workdir: PathBuf, current_sha: ObjectId, sink: F) -> impl FnOnce() -> Result +pub mod gixworkload; +pub mod scheduled; + +pub trait Workload { + fn id(&self) -> String; + fn interval(&self) -> Duration; + fn work( + &self, + workdir: PathBuf, + current_sha: ObjectId, + sink: F, + ) -> Result where - F: Fn(ActionOutput) -> Result<(), GitOpsError> + Clone + Send + 'static, - { - let config = self.config.clone(); - let task_id = config.name.clone(); - let repodir = self.repo_dir.clone(); - let deadline = Instant::now() + config.timeout; - - move || { - let new_sha = ensure_worktree(&config.git, deadline, &repodir, &workdir)?; - if current_sha != new_sha { - sink(ActionOutput::Changes( - config.name.clone(), - current_sha, - new_sha, - )) - .map_err(|err| GitOpsError::SendError(format!("{}", err)))?; - for action in config.actions { - let name = format!("{}|{}", task_id, action.id()); - run_action(&name, &action, &workdir, deadline, &sink)?; - } - } - if let Some(cfg) = config.notify { - update_commit_status(&cfg, &new_sha.to_string(), GitHubStatus::Success, "Did it")?; - } - std::fs::remove_dir_all(&workdir).map_err(GitOpsError::WorkDir)?; - Ok(new_sha) - } - } -} - -impl Task for GitTask { - fn is_eligible(&self) -> bool { - self.worker.is_none() && self.state.next_run < SystemTime::now() - } - - fn is_running(&self) -> bool { - self.worker.as_ref().is_some_and(|h| !h.is_finished()) - } - - fn is_finished(&self) -> bool { - self.worker.as_ref().is_some_and(|h| h.is_finished()) - } - - fn schedule_next(&mut self) { - self.state.next_run = SystemTime::now().add(self.config.interval); - } - - fn start(&mut self, tx: Sender) -> Result<(), GitOpsError> { - let current_sha = self.state.current_sha; - let workdir = tempfile::tempdir() - .map_err(GitOpsError::WorkDir)? - .into_path(); - let sink = move |event| { - tx.send(event) - .map_err(|err| GitOpsError::SendError(format!("{}", err))) - }; - self.worker = Some(spawn(self.work(workdir, current_sha, sink))); - Ok(()) - } - - fn finalize(&mut self) -> Result<(), GitOpsError> { - let new_sha = self - .worker - .take() - .expect("result only called once") - .join() - .unwrap()?; - self.state.current_sha = new_sha; - Ok(()) - } - - fn state(&self) -> State { - self.state.clone() - } - - fn set_state(&mut self, state: State) { - self.state = state; - } + F: Fn(ActionOutput) -> Result<(), GitOpsError> + Clone + Send + 'static; } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/task/scheduled.rs b/src/task/scheduled.rs new file mode 100644 index 0000000..74cef4c --- /dev/null +++ b/src/task/scheduled.rs @@ -0,0 +1,126 @@ +use std::{ + ops::Add, + sync::mpsc::Sender, + thread::{spawn, JoinHandle}, + time::SystemTime, +}; + +use gix::ObjectId; + +use crate::{errors::GitOpsError, receiver::ActionOutput}; + +use super::{State, Workload}; + +pub struct ScheduledTask { + work: W, + pub state: State, + worker: Option>>, +} + +impl ScheduledTask { + pub fn new(work: W) -> Self { + Self { + work, + state: State::default(), + worker: None, + } + } + + pub fn id(&self) -> String { + self.work.id() + } + + pub fn is_eligible(&self) -> bool { + self.worker.is_none() && self.state.next_run < SystemTime::now() + } + + pub fn is_running(&self) -> bool { + self.worker.as_ref().is_some_and(|h| !h.is_finished()) + } + + pub fn is_finished(&self) -> bool { + self.worker.as_ref().is_some_and(|h| h.is_finished()) + } + + pub fn schedule_next(&mut self) { + self.state.next_run = SystemTime::now().add(self.work.interval()); + } + + pub fn start(&mut self, tx: Sender) -> Result<(), GitOpsError> { + let current_sha = self.state.current_sha; + let workdir = tempfile::tempdir() + .map_err(GitOpsError::WorkDir)? + .into_path(); + let sink = move |event| { + tx.send(event) + .map_err(|err| GitOpsError::SendError(format!("{}", err))) + }; + let work = self.work.clone(); + self.worker = Some(spawn(move || work.work(workdir, current_sha, sink))); + Ok(()) + } + + pub fn finalize(&mut self) -> Result<(), GitOpsError> { + let new_sha = self + .worker + .take() + .expect("result only called once") + .join() + .unwrap()?; + self.state.current_sha = new_sha; + Ok(()) + } + + pub fn state(&self) -> State { + self.state.clone() + } + + pub fn set_state(&mut self, state: State) { + self.state = state; + } +} + +#[cfg(test)] +mod tests { + use std::{time::{Duration, SystemTime}, thread::sleep}; + + use gix::ObjectId; + + use crate::{task::{scheduled::ScheduledTask, State}, testutils::TestWorkload}; + + #[test] + fn scheduled_task_flow() { + let (tx, _) = std::sync::mpsc::channel(); + let mut task = ScheduledTask::new(TestWorkload::default()); + assert!(task.is_eligible()); + assert!(!task.is_running()); + assert!(!task.is_finished()); + task.start(tx).unwrap(); + assert!(!task.is_eligible()); + assert!(task.is_running()); + assert!(!task.is_finished()); + while !task.is_finished() { + sleep(Duration::from_millis(2)); + } + assert!(!task.is_eligible()); + assert!(!task.is_running()); + task.finalize().unwrap(); + assert!(!task.is_finished()); + assert!(task.state().current_sha.is_empty_blob()); + while !task.is_eligible() { + sleep(Duration::from_millis(2)); + } + } + + #[test] + fn scheduled_task_on_existing_state() { + let mut task = ScheduledTask::new(TestWorkload::default()); + task.set_state(State { + current_sha: ObjectId::null(gix::hash::Kind::Sha1), + next_run: SystemTime::now() + Duration::from_millis(10), + }); + assert!(!task.is_eligible()); + sleep(Duration::from_millis(10)); + assert!(task.is_eligible()); + } +}