Skip to content

Commit

Permalink
[ENHC0010012A] Workflow executions: Cleanup job + service code flow (#…
Browse files Browse the repository at this point in the history
…581)

* main code flow for cleanup queuing

* fix codeflow for canceled and status jobs

* improve code flow with switch case

* fix codeflow to unreachable case

* fix existing test cases

* add destroy service tests

* simplify code

* fix tests
  • Loading branch information
JeffreyThiessen authored May 29, 2024
1 parent 4d1aa01 commit 52a179c
Show file tree
Hide file tree
Showing 33 changed files with 896 additions and 41 deletions.
4 changes: 4 additions & 0 deletions app/jobs/workflow_execution_cancelation_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class WorkflowExecutionCancelationJob < ApplicationJob
workflow_execution.state = :error
workflow_execution.http_error_code = exception.http_error_code
workflow_execution.save

WorkflowExecutionCleanupJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution)

workflow_execution
end

def perform(workflow_execution, user)
Expand Down
14 changes: 14 additions & 0 deletions app/jobs/workflow_execution_cleanup_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

# cleans up workflow execution files no longer needed after completion
class WorkflowExecutionCleanupJob < ApplicationJob
queue_as :default

def perform(workflow_execution)
return unless workflow_execution.completed? ||
workflow_execution.canceled? ||
workflow_execution.error?

WorkflowExecutions::CleanupService.new(workflow_execution).execute
end
end
14 changes: 9 additions & 5 deletions app/jobs/workflow_execution_status_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,23 @@ class WorkflowExecutionStatusJob < ApplicationJob
workflow_execution.state = :error
workflow_execution.http_error_code = exception.http_error_code
workflow_execution.save

WorkflowExecutionCleanupJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution)

workflow_execution
end

def perform(workflow_execution)
def perform(workflow_execution) # rubocop:disable Metrics/AbcSize
# User signaled to cancel
return if workflow_execution.canceling? || workflow_execution.canceled?

wes_connection = Integrations::Ga4ghWesApi::V1::ApiConnection.new.conn
workflow_execution = WorkflowExecutions::StatusService.new(workflow_execution, wes_connection).execute

# ga4gh has cancelled/error state
return if workflow_execution.canceled? || workflow_execution.error?

if workflow_execution.completing?
case workflow_execution.state.to_sym
when :canceled, :error
WorkflowExecutionCleanupJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution)
when :completing
WorkflowExecutionCompletionJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution)
else
WorkflowExecutionStatusJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution)
Expand Down
4 changes: 4 additions & 0 deletions app/jobs/workflow_execution_submission_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ class WorkflowExecutionSubmissionJob < ApplicationJob
workflow_execution.state = :error
workflow_execution.http_error_code = exception.http_error_code
workflow_execution.save

WorkflowExecutionCleanupJob.set(wait_until: 30.seconds.from_now).perform_later(workflow_execution)

workflow_execution
end

def perform(workflow_execution)
Expand Down
2 changes: 1 addition & 1 deletion app/models/workflow_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def cancellable?
end

def deletable?
%w[completed error canceled].include?(state)
%w[completed error canceled].include?(state) && cleaned?
end

def sent_to_ga4gh?
Expand Down
3 changes: 3 additions & 0 deletions app/services/workflow_executions/cancel_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ def execute
unless @workflow_execution.sent_to_ga4gh?
@workflow_execution.state = :canceled
@workflow_execution.save

WorkflowExecutionCleanupJob.set(wait_until: 30.seconds.from_now).perform_later(@workflow_execution)

return @workflow_execution
end

Expand Down
4 changes: 4 additions & 0 deletions app/services/workflow_executions/cancelation_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def execute
@workflow_execution.state = :canceled

@workflow_execution.save

WorkflowExecutionCleanupJob.set(wait_until: 30.seconds.from_now).perform_later(@workflow_execution)

@workflow_execution
end
end
end
4 changes: 3 additions & 1 deletion app/services/workflow_executions/completion_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def initialize(workflow_execution, params = {})
@output_base_path = "#{@workflow_execution.blob_run_directory}/output/"
end

def execute
def execute # rubocop:disable Metrics/MethodLength
return false unless @workflow_execution.completing?

run_output_data = download_decompress_parse_gziped_json("#{@output_base_path}iridanext.output.json.gz")
Expand Down Expand Up @@ -44,6 +44,8 @@ def execute

@workflow_execution.save

WorkflowExecutionCleanupJob.set(wait_until: 30.seconds.from_now).perform_later(@workflow_execution)

@workflow_execution
end

Expand Down
4 changes: 2 additions & 2 deletions app/services/workflow_executions/status_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(workflow_execution, wes_connection, user = nil, params = {})
@wes_client = Integrations::Ga4ghWesApi::V1::Client.new(conn: wes_connection)
end

def execute # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
def execute
return false if @workflow_execution.run_id.nil?

run_status = @wes_client.get_run_status(@workflow_execution.run_id)
Expand All @@ -20,7 +20,7 @@ def execute # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComp
new_state = if state == 'RUNNING'
:running
elsif state == 'COMPLETE'
:completing if state == 'COMPLETE'
:completing
elsif Integrations::Ga4ghWesApi::V1::States::CANCELATION_STATES.include?(state)
:canceled
elsif Integrations::Ga4ghWesApi::V1::States::ERROR_STATES.include?(state)
Expand Down
209 changes: 209 additions & 0 deletions docs-site/docs/development/integration/workflow_execution_code_flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
---
sidebar_position: 2
id: workflow_execution_code_flow
title: Workflow Execution Code Flow Diagram
---

![my image](/img/workflow_execution_code2flow.png)

### code2flow.com code

```
switch (User Interactions) {
Create a new run => {
goto new_run;
}
Cancel running workflow =>{
`canceling` **State**;
goto cancel_run;
}
Destroy existing workflow => {
goto destroy;
}
}
new_run:
block {
`WorkflowExecutions\::CreateService`;
`initial` **State**;
}
block {
switch(**WorkflowExecutionPreparationJob**){
User Interrupt: Cancel => {
Job aborted;
goto cancel_run;
}
Normal execution => {
}
}
`WorkflowExecutions\::PreparationService`;
`prepared` **State**
Queue submission job;
}
block {
wesj: {
switch( **WorkflowExecutionSubmissionJob**) {
Normal Execution => {
goto wesus;
}
User Interrupt: Cancel => {
Job aborted;
goto cancel_run;
}
Connection Error =>{
goto wesj;
}
}
}
wesus: {
switch ( `WorkflowExecutions::SubmissionService`) {
ApiExceptionError => {
`error` **State**
Queue cleanup job;
goto cleanup_job;
}
Normal execution => {
`submitted` **State**
Queue status job;
goto westj;
}
}
}
}
block{
westj: {
switch( **WorkflowExecutionStatusJob**) {
Normal execution => {
goto wests;
}
User Interrupt: Cancel => {
Job aborted;
goto cancel_run;
}
Connection Error =>{
goto westj;
}
}
}
wests: {
switch (`WorkflowExecutions::StatusService`) {
ApiExceptionError => {
`error` **State**
Queue cleanup job;
goto cleanup_job;
}
Ga4gh `running` => {
Queue job again;
goto westj;
}
Ga4gh `error` / `canceled` => {
`error` **State**
Queue cleanup job;
goto cleanup_job;
}
Ga4gh `completed` => {
`completing` **State**
Queue completion job;
goto wecj;
}
}
}
}
block {
wecj:{
**WorkflowExecutionCompletionJob**
}
wecs: {
switch ( `WorkflowExecutions::CompletionService`) {
Normal execution => {
`completed` **State**
Queue cleanup job;
goto cleanup_job;
}
}
}
}
block {
cancel_run: {
switch(`WorkflowExecutions::CancelService`){
`initial` / `prepared` **States** => {
`canceled` **State**
Queue cleanup job;
goto cleanup_job;
}
all other **States** => {
`canceling` **State**
Queue cancelation job;
goto cancelation;
}
}
}
}
block {
cancelation: {
switch (**WorkflowExecutionCancelationJob**) {
ApiExceptionError=>{
switch(Run state?){
Already completed=>{
`canceled` **State**
Queue cleanup job;
goto cleanup_job;
}
Actual error=>{
`error` **State**
Queue cleanup job;
goto cleanup_job;
}
}
return;
}
Connection Error => {
goto cancelation;
}
Normal execution => {
}
}
`WorkflowExecutions\::CancelationService`;
`canceled` **State**
Queue cleanup job;
goto cleanup_job;
}
}
block {
destroy: {
`WorkflowExecutions\::DestroyService`;
switch(Check if workflow execution can be destroyed){
Check if cleaned => {
goto cleaned;
}
}
return;
}
}
block {
cleanup_job:{
**WorkflowExecutionCleanupJob**;
`WorkflowExecutions\::CleanupService`;
Sets `cleaned` to `true`
}
}
block {
cleaned:
`workflow_execution.cleaned?
}
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 10 additions & 0 deletions test/fixtures/active_storage/attachments.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,13 @@ data_export_7_test_export:
name: file
record: data_export_seven (DataExport)
blob: data_export_7_test_export_blob

attachmentEndToEndFWD_file_test_file_fastq:
name: file
record: attachmentEndToEndFWD (Attachment)
blob: attachmentEndToEndFWD_file_test_file_fastq_blob

attachmentEndToEndREV_file_test_file_fastq:
name: file
record: attachmentEndToEndREV (Attachment)
blob: attachmentEndToEndREV_file_test_file_fastq_blob
3 changes: 3 additions & 0 deletions test/fixtures/active_storage/blobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,6 @@ workflow_execution_completed_output_blob: <%= ActiveStorage::FixtureSet.blob fil
samples_workflow_execution_completed_output_blob: <%= ActiveStorage::FixtureSet.blob filename: "INXT_SAM_AAAAAAAABV.assembly.fa.gz", service_name: "test" %>

data_export_7_test_export_blob: <%= ActiveStorage::FixtureSet.blob filename: "data_export_7.zip", service_name: "test" %>

attachmentEndToEndFWD_file_test_file_fastq_blob: <%= ActiveStorage::FixtureSet.blob filename: "test_file_end_to_end_F.fastq", service_name: "test" %>
attachmentEndToEndREV_file_test_file_fastq_blob: <%= ActiveStorage::FixtureSet.blob filename: "test_file_end_to_end_R.fastq", service_name: "test" %>
Loading

0 comments on commit 52a179c

Please sign in to comment.