Skip to content

Commit

Permalink
ActionOutput -> WorkloadEvent and Workload::work() -> perform().
Browse files Browse the repository at this point in the history
  • Loading branch information
bittrance committed Oct 1, 2023
1 parent 95fb04d commit 32733ec
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 64 deletions.
20 changes: 12 additions & 8 deletions src/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::Deserialize;
use crate::{
errors::GitOpsError,
opts::CliOptions,
receiver::{ActionOutput, SourceType},
receiver::{SourceType, WorkloadEvent},
};

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -86,7 +86,7 @@ fn emit_data<F, R>(
) -> JoinHandle<Result<(), GitOpsError>>
where
R: Read + Send + 'static,
F: Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static,
F: Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static,
{
let sink = Arc::clone(sink);
spawn(move || {
Expand All @@ -96,7 +96,7 @@ where
if len == 0 {
break;
}
sink.lock().unwrap()(ActionOutput::Output(
sink.lock().unwrap()(WorkloadEvent::ActionOutput(
name.clone(),
source_type,
buf[..len].into(),
Expand All @@ -114,7 +114,7 @@ pub fn run_action<F>(
sink: &Arc<Mutex<F>>,
) -> Result<ActionResult, GitOpsError>
where
F: Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static,
F: Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static,
{
let mut command = build_command(action, cwd);
let mut child = command.spawn().map_err(GitOpsError::ActionError)?;
Expand All @@ -125,7 +125,7 @@ where
emit_data(name.to_string(), stderr, SourceType::StdErr, sink);
loop {
if let Some(exit) = child.try_wait().map_err(GitOpsError::ActionError)? {
sink.lock().unwrap()(ActionOutput::Exit(name.to_string(), exit))?;
sink.lock().unwrap()(WorkloadEvent::ActionExit(name.to_string(), exit))?;
if exit.success() {
break Ok(ActionResult::Success);
} else {
Expand All @@ -134,7 +134,7 @@ where
}
if Instant::now() > deadline {
child.kill().map_err(GitOpsError::ActionError)?;
sink.lock().unwrap()(ActionOutput::Timeout(name.to_string()))?;
sink.lock().unwrap()(WorkloadEvent::Timeout(name.to_string()))?;
break Ok(ActionResult::Failure);
}
sleep(Duration::from_secs(1));
Expand Down Expand Up @@ -179,8 +179,12 @@ mod tests {
assert!(matches!(res, Ok(ActionResult::Success)));
assert_eq!(
vec![
ActionOutput::Output("test".to_owned(), SourceType::StdOut, b"test\n".to_vec()),
ActionOutput::Exit("test".to_owned(), ExitStatus::from_raw(0)),
WorkloadEvent::ActionOutput(
"test".to_owned(),
SourceType::StdOut,
b"test\n".to_vec()
),
WorkloadEvent::ActionExit("test".to_owned(), ExitStatus::from_raw(0)),
],
events.lock().unwrap()[..]
);
Expand Down
5 changes: 1 addition & 4 deletions src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use crate::{
receiver::logging_receiver,
store::{FileStore, Store},
task::{
github::github_watcher,
gixworkload::GitWorkload,
scheduled::ScheduledTask,
GitTaskConfig,
github::github_watcher, gixworkload::GitWorkload, scheduled::ScheduledTask, GitTaskConfig,
},
};

Expand Down
24 changes: 13 additions & 11 deletions src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,44 @@ pub enum SourceType {
}

#[derive(Clone, Debug, PartialEq)]
pub enum ActionOutput {
pub enum WorkloadEvent {
// TODO Name types would be nice
Changes(String, ObjectId, ObjectId),
Output(String, SourceType, Vec<u8>),
Exit(String, ExitStatus),
ActionOutput(String, SourceType, Vec<u8>),
ActionExit(String, ExitStatus),
Success(String, ObjectId),
Failure(String, String, ObjectId),
Error(String, String, ObjectId),
Timeout(String),
}

pub fn logging_receiver(events: &Receiver<ActionOutput>) {
pub fn logging_receiver(events: &Receiver<WorkloadEvent>) {
while let Ok(event) = events.recv() {
match event {
ActionOutput::Changes(name, prev_sha, new_sha) => {
WorkloadEvent::Changes(name, prev_sha, new_sha) => {
if prev_sha == ObjectId::null(Kind::Sha1) {
println!("{}: New repo @ {}", name, new_sha);
} else {
println!("{}: Updated repo {} -> {}", name, prev_sha, new_sha);
}
}
ActionOutput::Output(name, source_type, data) => match source_type {
WorkloadEvent::ActionOutput(name, source_type, data) => match source_type {
SourceType::StdOut => println!("{}: {}", name, String::from_utf8_lossy(&data)),
SourceType::StdErr => eprintln!("{}: {}", name, String::from_utf8_lossy(&data)),
},
ActionOutput::Exit(name, exit) => println!("{}: exited with code {}", name, exit),
ActionOutput::Success(name, new_sha) => {
WorkloadEvent::ActionExit(name, exit) => {
println!("{}: exited with code {}", name, exit)
}
WorkloadEvent::Success(name, new_sha) => {
println!("{}: actions successful for {}", name, new_sha)
}
ActionOutput::Failure(task, action, new_sha) => {
WorkloadEvent::Failure(task, action, new_sha) => {
println!("{}: action {} failed for {}", task, action, new_sha)
}
ActionOutput::Error(name, error, new_sha) => {
WorkloadEvent::Error(name, error, new_sha) => {
println!("{}: error running actions for {}: {}", name, new_sha, error)
}
ActionOutput::Timeout(name) => println!("{}: took too long", name),
WorkloadEvent::Timeout(name) => println!("{}: took too long", name),
}
}
}
16 changes: 9 additions & 7 deletions src/task/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reqwest::{
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::{errors::GitOpsError, opts::CliOptions, receiver::ActionOutput};
use crate::{errors::GitOpsError, opts::CliOptions, receiver::WorkloadEvent};

#[derive(Clone, Deserialize)]
pub struct GitHubNotifyConfig {
Expand Down Expand Up @@ -182,34 +182,36 @@ pub fn update_commit_status(
}
}

pub fn github_watcher(notify_config: GitHubNotifyConfig) -> impl Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static {
pub fn github_watcher(
notify_config: GitHubNotifyConfig,
) -> impl Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static {
move |event| {
match event {
ActionOutput::Changes(name, prev_sha, new_sha) => {
WorkloadEvent::Changes(name, prev_sha, new_sha) => {
update_commit_status(
&notify_config,
&new_sha,
GitHubStatus::Pending,
&format!("running {} [last success {}]", name, prev_sha),
)?;
}
ActionOutput::Success(name, new_sha) => {
WorkloadEvent::Success(name, new_sha) => {
update_commit_status(
&notify_config,
&new_sha,
GitHubStatus::Success,
&format!("{} succeeded", name),
)?;
}
ActionOutput::Failure(task, action, new_sha) => {
WorkloadEvent::Failure(task, action, new_sha) => {
update_commit_status(
&notify_config,
&new_sha,
GitHubStatus::Failure,
&format!("{} failed on action {}", task, action),
)?;
}
ActionOutput::Error(task, action, new_sha) => {
WorkloadEvent::Error(task, action, new_sha) => {
update_commit_status(
&notify_config,
&new_sha,
Expand All @@ -221,4 +223,4 @@ pub fn github_watcher(notify_config: GitHubNotifyConfig) -> impl Fn(ActionOutput
};
Ok(())
}
}
}
22 changes: 11 additions & 11 deletions src/task/gixworkload.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
path::{PathBuf, Path},
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::{Duration, Instant},
};
Expand All @@ -11,7 +11,7 @@ use crate::{
errors::GitOpsError,
git::ensure_worktree,
opts::CliOptions,
receiver::ActionOutput,
receiver::WorkloadEvent,
};

use super::{GitTaskConfig, Workload};
Expand All @@ -22,7 +22,7 @@ pub struct GitWorkload {
config: GitTaskConfig,
repo_dir: PathBuf,
watchers:
Vec<Arc<Mutex<Box<dyn Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static>>>>,
Vec<Arc<Mutex<Box<dyn Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static>>>>,
}

impl GitWorkload {
Expand All @@ -41,7 +41,7 @@ impl GitWorkload {

pub fn watch(
&mut self,
watcher: impl Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static,
watcher: impl Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static,
) {
self.watchers.push(Arc::new(Mutex::new(Box::new(watcher))));
}
Expand All @@ -50,7 +50,7 @@ impl GitWorkload {
&self,
workdir: &Path,
deadline: Instant,
sink: &Arc<Mutex<impl Fn(ActionOutput) -> Result<(), GitOpsError> + Send + 'static>>,
sink: &Arc<Mutex<impl Fn(WorkloadEvent) -> Result<(), GitOpsError> + Send + 'static>>,
) -> Result<Option<String>, GitOpsError> {
for action in &self.config.actions {
let name = format!("{}|{}", self.config.name, action.id());
Expand All @@ -72,10 +72,10 @@ impl Workload for GitWorkload {
self.config.interval
}

fn work(&self, workdir: PathBuf, current_sha: ObjectId) -> Result<ObjectId, GitOpsError> {
fn perform(&self, workdir: PathBuf, current_sha: ObjectId) -> Result<ObjectId, GitOpsError> {
let deadline = Instant::now() + self.config.timeout;
let watchers = self.watchers.clone();
let sink = Arc::new(Mutex::new(move |event: ActionOutput| {
let sink = Arc::new(Mutex::new(move |event: WorkloadEvent| {
for watcher in &watchers {
watcher.lock().unwrap()(event.clone())?;
}
Expand All @@ -84,27 +84,27 @@ impl Workload for GitWorkload {

let new_sha = ensure_worktree(&self.config.git, deadline, &self.repo_dir, &workdir)?;
if current_sha != new_sha {
sink.lock().unwrap()(ActionOutput::Changes(
sink.lock().unwrap()(WorkloadEvent::Changes(
self.config.name.clone(),
current_sha,
new_sha,
))
.map_err(|err| GitOpsError::NotifyError(format!("{}", err)))?;
match self.run_actions(&workdir, deadline, &sink) {
Ok(None) => {
sink.lock().unwrap()(ActionOutput::Success(self.config.name.clone(), new_sha))
sink.lock().unwrap()(WorkloadEvent::Success(self.config.name.clone(), new_sha))
.map_err(|err| GitOpsError::NotifyError(format!("{}", err)))?
}
Ok(Some(action_name)) => {
sink.lock().unwrap()(ActionOutput::Failure(
sink.lock().unwrap()(WorkloadEvent::Failure(
self.config.name.clone(),
action_name,
new_sha,
))
.map_err(|err| GitOpsError::NotifyError(format!("{}", err)))?;
}
Err(err) => {
sink.lock().unwrap()(ActionOutput::Error(
sink.lock().unwrap()(WorkloadEvent::Error(
self.config.name.clone(),
format!("{}", err),
new_sha,
Expand Down
2 changes: 1 addition & 1 deletion src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod scheduled;
pub trait Workload {
fn id(&self) -> String;
fn interval(&self) -> Duration;
fn work(&self, workdir: PathBuf, current_sha: ObjectId) -> Result<ObjectId, GitOpsError>;
fn perform(&self, workdir: PathBuf, current_sha: ObjectId) -> Result<ObjectId, GitOpsError>;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion src/task/scheduled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<W: Workload + Clone + Send + 'static> ScheduledTask<W> {
.map_err(GitOpsError::WorkDir)?
.into_path();
let work = self.work.clone();
self.worker = Some(spawn(move || work.work(workdir, current_sha)));
self.worker = Some(spawn(move || work.perform(workdir, current_sha)));
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Workload for TestWorkload {
Duration::from_secs(1)
}

fn work(&self, _workdir: PathBuf, _current_sha: ObjectId) -> Result<ObjectId, GitOpsError> {
fn perform(&self, _workdir: PathBuf, _current_sha: ObjectId) -> Result<ObjectId, GitOpsError> {
self.status
.store(true, std::sync::atomic::Ordering::Relaxed);
sleep(Duration::from_millis(10));
Expand Down
Loading

0 comments on commit 32733ec

Please sign in to comment.