Skip to content

Commit

Permalink
Forgot this
Browse files Browse the repository at this point in the history
  • Loading branch information
AidanHilt committed Jan 29, 2024
1 parent ba8585a commit bb68b3f
Showing 1 changed file with 38 additions and 33 deletions.
71 changes: 38 additions & 33 deletions src/argowrapper/engine/argo_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@
from typing import Dict, List, Literal

import argo_workflows
from argo_workflows.api import archived_workflow_service_api, workflow_service_api, artifact_service_api
from argo_workflows.api import (
archived_workflow_service_api,
artifact_service_api,
workflow_service_api,
)
from argo_workflows.exceptions import NotFoundException
from argo_workflows.model.io_argoproj_workflow_v1alpha1_retry_archived_workflow_request import (
IoArgoprojWorkflowV1alpha1RetryArchivedWorkflowRequest,
)
from argo_workflows.model.io_argoproj_workflow_v1alpha1_workflow_create_request import (
IoArgoprojWorkflowV1alpha1WorkflowCreateRequest,
)
from argo_workflows.model.io_argoproj_workflow_v1alpha1_workflow_terminate_request import (
IoArgoprojWorkflowV1alpha1WorkflowTerminateRequest,
)
from argo_workflows.model.io_argoproj_workflow_v1alpha1_workflow_retry_request import (
IoArgoprojWorkflowV1alpha1WorkflowRetryRequest,
)
from argo_workflows.model.io_argoproj_workflow_v1alpha1_retry_archived_workflow_request import (
IoArgoprojWorkflowV1alpha1RetryArchivedWorkflowRequest,
from argo_workflows.model.io_argoproj_workflow_v1alpha1_workflow_terminate_request import (
IoArgoprojWorkflowV1alpha1WorkflowTerminateRequest,
)
from argo_workflows.exceptions import NotFoundException

from argowrapper import logger
from argowrapper.constants import ARGO_HOST, ARGO_NAMESPACE, WORKFLOW
Expand Down Expand Up @@ -60,7 +64,6 @@ def _get_all_workflows(self):
return self.api_instance.list_workflows(namespace=ARGO_NAMESPACE).to_str()

def _get_workflow_details_dict(self, workflow_name: str) -> Dict:

return self.api_instance.get_workflow(
namespace=ARGO_NAMESPACE,
name=workflow_name,
Expand Down Expand Up @@ -93,36 +96,38 @@ def _get_workflow_phase(self, workflow_name: str) -> str:
namespace=ARGO_NAMESPACE,
name=workflow_name,
fields="status.phase",
_check_return_type=False,
_check_return_type=False,
).to_dict()
return phase_return["status"].get("phase")

def _get_workflow_node_artifact(self, uid: str, node_id: str) -> str:
return self.artifact_api_instance.get_output_artifact_by_uid(
uid=uid,
node_id=node_id,
artifact_name="main-logs",
_check_return_type=False,
).read().decode()
return (
self.artifact_api_instance.get_output_artifact_by_uid(
uid=uid,
node_id=node_id,
artifact_name="main-logs",
_check_return_type=False,
)
.read()
.decode()
)

def _get_log_errors(self, uid: str, status_nodes_dict: Dict) -> List[Dict]:
errors = []
for node_id, step in status_nodes_dict.items():
if step.get("phase") in ("Failed", "Error") and step.get("type")=="Retry":
if step.get("phase") in ("Failed", "Error") and step.get("type") == "Retry":
message = (
step["message"] if step.get("message") else "No message provided"
)
node_type = step.get("type")
node_step = step.get("displayName")
node_step_template = step.get("templateName")
node_step_template = step.get("templateName")
node_phase = step.get("phase")
node_outputs_mainlog = self._get_workflow_node_artifact(
uid=uid,
node_id=node_id
uid=uid, node_id=node_id
)
node_log_interpreted = GWAS.interpret_gwas_workflow_error(
step_name=node_step,
step_log=node_outputs_mainlog
step_name=node_step, step_log=node_outputs_mainlog
)
errors.append(
{
Expand All @@ -133,7 +138,7 @@ def _get_log_errors(self, uid: str, status_nodes_dict: Dict) -> List[Dict]:
"step_name": node_step,
"step_template": node_step_template,
"error_message": message,
"error_interpreted": node_log_interpreted
"error_interpreted": node_log_interpreted,
}
)
else:
Expand Down Expand Up @@ -321,6 +326,7 @@ def get_workflows_for_user(self, auth_header: str) -> List[Dict]:
)
archived_workflow_list_return = (
self.archive_api_instance.list_archived_workflows(
namespace=ARGO_NAMESPACE,
list_options_label_selector=label_selector,
_check_return_type=False,
)
Expand Down Expand Up @@ -382,11 +388,11 @@ def get_workflow_logs(self, workflow_name: str, uid: str) -> List[Dict]:
archived_workflow_phase = archived_workflow_dict["status"].get("phase")
if archived_workflow_phase in ("Failed", "Error"):
archived_workflow_details_nodes = archived_workflow_dict["status"].get(
"nodes")
"nodes"
)
archived_workflow_errors = self._get_log_errors(
uid=uid,
status_nodes_dict=archived_workflow_details_nodes
)
uid=uid, status_nodes_dict=archived_workflow_details_nodes
)
return archived_workflow_errors
else:
logger.info(
Expand All @@ -404,13 +410,12 @@ def get_workflow_logs(self, workflow_name: str, uid: str) -> List[Dict]:
active_workflow_phase = self._get_workflow_phase(workflow_name)
if active_workflow_phase in ("Failed", "Error"):
active_workflow_log_return = self._get_workflow_log_dict(workflow_name)
active_workflow_details_nodes = active_workflow_log_return["status"].get(
"nodes"
)
active_workflow_details_nodes = active_workflow_log_return[
"status"
].get("nodes")
active_workflow_errors = self._get_log_errors(
uid=uid,
status_nodes_dict=active_workflow_details_nodes
)
uid=uid, status_nodes_dict=active_workflow_details_nodes
)
return active_workflow_errors
else:
logger.info(
Expand Down

0 comments on commit bb68b3f

Please sign in to comment.