Skip to content

Commit

Permalink
Refactor tasks 2/2.
Browse files Browse the repository at this point in the history
  • Loading branch information
bittrance committed Sep 28, 2023
1 parent d463816 commit 6784851
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 240 deletions.
127 changes: 34 additions & 93 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::{sync::mpsc::Sender, thread::sleep, time::Duration};

use task::{scheduled::ScheduledTask, Workload};

pub mod actions;
pub mod errors;
pub mod git;
pub mod opts;
pub mod receiver;
pub mod store;
pub mod task;
#[cfg(test)]
pub(crate) mod testutils;
pub(crate) mod utils;

#[derive(Debug, PartialEq)]
Expand All @@ -15,16 +19,16 @@ pub enum Progress {
Idle,
}

pub fn run_tasks<F, T>(
tasks: &mut [T],
pub fn run_tasks<F, W>(
tasks: &mut [ScheduledTask<W>],
mut persist: F,
tx: &Sender<receiver::ActionOutput>,
once_only: bool,
poll_interval: Duration,
) -> Result<(), errors::GitOpsError>
where
F: FnMut(&T) -> Result<(), errors::GitOpsError>,
T: task::Task,
F: FnMut(&ScheduledTask<W>) -> Result<(), errors::GitOpsError>,
W: Workload + Clone + Send + 'static,
{
loop {
let res = progress_one_task(tasks, &mut persist, tx)?;
Expand All @@ -40,14 +44,14 @@ where
}
}

fn progress_one_task<F, T>(
tasks: &mut [T],
fn progress_one_task<F, W>(
tasks: &mut [ScheduledTask<W>],
persist: &mut F,
tx: &Sender<receiver::ActionOutput>,
) -> Result<Progress, errors::GitOpsError>
where
F: FnMut(&T) -> Result<(), errors::GitOpsError>,
T: task::Task,
F: FnMut(&ScheduledTask<W>) -> Result<(), errors::GitOpsError>,
W: Workload + Clone + Send + 'static,
{
if let Some(task) = tasks.iter_mut().find(|t| t.is_eligible()) {
let task_tx = tx.clone();
Expand All @@ -70,101 +74,38 @@ where

#[cfg(test)]
mod lib {
use std::cell::RefCell;
use std::sync::mpsc::Sender;

use crate::errors::GitOpsError;
use crate::receiver::ActionOutput;
use crate::task::{State, Task};

#[derive(Default)]
struct TestTask {
pub status: RefCell<Option<bool>>,
pub eligible: RefCell<bool>,
}

impl TestTask {
pub fn new() -> Self {
Default::default()
}

pub fn make_eligible(&self) {
*self.eligible.borrow_mut() = true;
}
use std::{thread::sleep, time::Duration};

pub fn run(&self) {
*self.eligible.borrow_mut() = false;
*self.status.borrow_mut() = Some(true);
}

pub fn complete(&self) {
let mut v = self.status.borrow_mut();
if v.is_some() {
*v = Some(false);
}
}
}

impl Task for TestTask {
fn is_eligible(&self) -> bool {
self.status.borrow().is_none() && *self.eligible.borrow()
}

fn is_running(&self) -> bool {
self.status.borrow().is_some_and(|s| s)
}

fn is_finished(&self) -> bool {
self.status.borrow().is_some_and(|s| !s)
}

fn schedule_next(&mut self) {}

fn start(&mut self, _: Sender<ActionOutput>) -> Result<(), GitOpsError> {
assert!(*self.eligible.borrow());
assert!(self.status.borrow().is_none());
self.run();
Ok(())
}

fn finalize(&mut self) -> Result<(), GitOpsError> {
assert!(!*self.eligible.borrow());
assert!(self.status.borrow().is_some());
*self.status.borrow_mut() = None;
Ok(())
}

fn state(&self) -> State {
todo!("Not needed")
}

fn set_state(&mut self, _: State) {
todo!("Not needed")
}
}
use crate::{task::{scheduled::ScheduledTask, State}, testutils::TestWorkload};

#[test]
fn dont_start_ineligible_task() {
let mut tasks = vec![TestTask::new()];
fn run_eligible_task() {
let mut tasks = vec![ScheduledTask::new(TestWorkload::default())];
let (tx, _) = std::sync::mpsc::channel();
let mut persist = |_t: &TestTask| Ok(());
let mut persist = |_t: &ScheduledTask<TestWorkload>| Ok(());
let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap();
assert!(progress == super::Progress::Running);
assert!(tasks[0].is_running());
while !tasks[0].is_finished() {
sleep(Duration::from_millis(2));
}
let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap();
assert!(progress == super::Progress::Running);
assert!(!tasks[0].is_finished());
let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap();
assert!(progress == super::Progress::Idle);
assert!(!tasks[0].is_running());
}

#[test]
fn run_eligible_task() {
let mut tasks = vec![TestTask::new()];
fn dont_start_ineligible_task() {
let mut tasks = vec![ScheduledTask::new(TestWorkload::default())];
tasks[0].set_state(State {
next_run: std::time::SystemTime::now() + Duration::from_secs(1),
current_sha: gix::ObjectId::empty_blob(gix::hash::Kind::Sha1),
});
let (tx, _) = std::sync::mpsc::channel();
let mut persist = |_t: &TestTask| Ok(());
tasks[0].make_eligible();
let mut persist = |_t: &ScheduledTask<TestWorkload>| Ok(());
let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap();
assert!(progress == super::Progress::Running);
assert!(tasks[0].is_running());
tasks[0].complete();
let progress = super::progress_one_task(&mut tasks[..], &mut persist, &tx).unwrap();
assert!(progress == super::Progress::Running);
assert!(!tasks[0].is_eligible());
assert!(progress == super::Progress::Idle);
}
}
7 changes: 4 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use kitops::opts::{load_store, load_tasks, CliOptions};
use kitops::receiver::logging_receiver;
use kitops::run_tasks;
use kitops::store::Store;
use kitops::task::{GitTask, Task};
use kitops::task::gixworkload::GitWorkload;
use kitops::task::scheduled::ScheduledTask;
use std::time::Duration;
use std::{collections::HashSet, sync::mpsc::channel, thread::spawn};

Expand All @@ -20,7 +21,7 @@ fn main() -> Result<(), GitOpsError> {
});
let mut tasks = load_tasks(&opts)?;
let mut store = load_store(&opts)?;
let task_ids = tasks.iter().map(GitTask::id).collect::<HashSet<_>>();
let task_ids = tasks.iter().map(ScheduledTask::id).collect::<HashSet<_>>();
store.retain(task_ids);
for task in &mut tasks {
if let Some(s) = store.get(&task.id()) {
Expand All @@ -29,7 +30,7 @@ fn main() -> Result<(), GitOpsError> {
}
run_tasks(
&mut tasks[..],
|t: &GitTask| store.persist(t.id(), t),
|t: &ScheduledTask<GitWorkload>| store.persist(t.id(), t),
&tx,
opts.once_only,
Duration::from_secs(1),
Expand Down
14 changes: 8 additions & 6 deletions src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
actions::Action,
errors::GitOpsError,
store::{FileStore, Store},
task::{GitTask, GitTaskConfig},
task::{gixworkload::GitWorkload, scheduled::ScheduledTask, GitTaskConfig},
};

const DEFAULT_BRANCH: &str = "main";
Expand Down Expand Up @@ -94,26 +94,28 @@ struct ConfigFile {
tasks: Vec<GitTaskConfig>,
}

fn tasks_from_file(opts: &CliOptions) -> Result<Vec<GitTask>, GitOpsError> {
fn tasks_from_file(opts: &CliOptions) -> Result<Vec<ScheduledTask<GitWorkload>>, GitOpsError> {
let config =
File::open(opts.config_file.clone().unwrap()).map_err(GitOpsError::MissingConfig)?;
let config_file: ConfigFile =
serde_yaml::from_reader(config).map_err(GitOpsError::MalformedConfig)?;
Ok(config_file
.tasks
.into_iter()
.map(|c| GitTask::from_config(c, opts))
.map(|c| ScheduledTask::new(GitWorkload::from_config(c, opts)))
.collect())
}

fn tasks_from_opts(opts: &CliOptions) -> Result<Vec<GitTask>, GitOpsError> {
fn tasks_from_opts(opts: &CliOptions) -> Result<Vec<ScheduledTask<GitWorkload>>, GitOpsError> {
let mut config: GitTaskConfig = TryFrom::try_from(opts)?;
let action: Action = TryFrom::try_from(opts)?;
config.add_action(action);
Ok(vec![GitTask::from_config(config, opts)])
let work = GitWorkload::from_config(config, opts);
let task = ScheduledTask::new(work);
Ok(vec![task])
}

pub fn load_tasks(opts: &CliOptions) -> Result<Vec<GitTask>, GitOpsError> {
pub fn load_tasks(opts: &CliOptions) -> Result<Vec<ScheduledTask<GitWorkload>>, GitOpsError> {
if opts.url.is_some() {
tasks_from_opts(opts)
} else {
Expand Down
19 changes: 11 additions & 8 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ use std::{

use crate::{
errors::GitOpsError,
task::{State, Task},
task::{scheduled::ScheduledTask, State, Workload},
};

pub trait Store {
fn get(&self, id: &str) -> Option<&State>;
fn retain(&mut self, task_ids: HashSet<String>);
fn persist<T>(&mut self, id: String, task: &T) -> Result<(), GitOpsError>
where
T: Task;
fn persist<W: Workload + Clone + Send + 'static>(
&mut self,
id: String,
task: &ScheduledTask<W>,
) -> Result<(), GitOpsError>;
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -47,10 +49,11 @@ impl Store for FileStore {
self.state.retain(|id, _| task_ids.contains(id));
}

fn persist<T>(&mut self, id: String, task: &T) -> Result<(), GitOpsError>
where
T: Task,
{
fn persist<W: Workload + Clone + Send + 'static>(
&mut self,
id: String,
task: &ScheduledTask<W>,
) -> Result<(), GitOpsError> {
self.state.insert(id, task.state());
let file = File::create(&self.path).map_err(GitOpsError::SavingState)?;
serde_yaml::to_writer(file, &self.state).map_err(GitOpsError::SerdeState)
Expand Down
12 changes: 10 additions & 2 deletions src/task/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ fn generate_jwt(config: &GitHubNotifyConfig) -> Result<String, GitOpsError> {
.map_err(GitOpsError::GitHubBadPrivateKey)
}

fn get_installation_id(config: &GitHubNotifyConfig, client: &reqwest::blocking::Client, jwt_token: &String) -> Result<u64, GitOpsError> {
fn get_installation_id(
config: &GitHubNotifyConfig,
client: &reqwest::blocking::Client,
jwt_token: &String,
) -> Result<u64, GitOpsError> {
// TODO Is this different if we are installed organization-wise?
let url = format!(
"https://api.github.com/repos/{}/installation",
Expand Down Expand Up @@ -105,7 +109,11 @@ fn get_installation_id(config: &GitHubNotifyConfig, client: &reqwest::blocking::
Ok(installation_id)
}

fn get_access_token(installation_id: u64, client: &reqwest::blocking::Client, jwt_token: &String) -> Result<String, GitOpsError> {
fn get_access_token(
installation_id: u64,
client: &reqwest::blocking::Client,
jwt_token: &String,
) -> Result<String, GitOpsError> {
let url = format!(
"https://api.github.com/app/installations/{}/access_tokens",
installation_id
Expand Down
77 changes: 77 additions & 0 deletions src/task/gixworkload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::{
path::PathBuf,
time::{Duration, Instant},
};

use gix::ObjectId;

use crate::{
actions::run_action, errors::GitOpsError, git::ensure_worktree, opts::CliOptions,
receiver::ActionOutput,
};

use super::{
github::{update_commit_status, GitHubStatus},
GitTaskConfig, Workload,
};

#[derive(Clone)]
pub struct GitWorkload {
config: GitTaskConfig,
repo_dir: PathBuf,
}

impl GitWorkload {
pub fn from_config(config: GitTaskConfig, opts: &CliOptions) -> Self {
let repo_dir = opts
.repo_dir
.as_ref()
.map(|dir| dir.join(config.git.safe_url()))
.unwrap();
GitWorkload { config, repo_dir }
}
}

impl Workload for GitWorkload {
fn id(&self) -> String {
self.config.name.clone()
}

fn interval(&self) -> Duration {
self.config.interval
}

fn work<F>(
&self,
workdir: PathBuf,
current_sha: ObjectId,
sink: F,
) -> Result<ObjectId, GitOpsError>
where
F: Fn(ActionOutput) -> Result<(), GitOpsError> + Clone + Send + 'static,
{
let config = self.config.clone();
let task_id = config.name.clone();
let repodir = self.repo_dir.clone();
let deadline = Instant::now() + config.timeout;

let new_sha = ensure_worktree(&config.git, deadline, &repodir, &workdir)?;
if current_sha != new_sha {
sink(ActionOutput::Changes(
config.name.clone(),
current_sha,
new_sha,
))
.map_err(|err| GitOpsError::SendError(format!("{}", err)))?;
for action in config.actions {
let name = format!("{}|{}", task_id, action.id());
run_action(&name, &action, &workdir, deadline, &sink)?;
}
}
if let Some(cfg) = config.notify {
update_commit_status(&cfg, &new_sha.to_string(), GitHubStatus::Success, "Did it")?;
}
std::fs::remove_dir_all(&workdir).map_err(GitOpsError::WorkDir)?;
Ok(new_sha)
}
}
Loading

0 comments on commit 6784851

Please sign in to comment.