diff --git a/chaoscenter/subscriber/pkg/events/definations.go b/chaoscenter/subscriber/pkg/events/definations.go index 02bd1883911..f63bf6e3121 100644 --- a/chaoscenter/subscriber/pkg/events/definations.go +++ b/chaoscenter/subscriber/pkg/events/definations.go @@ -19,7 +19,7 @@ type SubscriberEvents interface { ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) - WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) + WorkflowEventHandler(oldObj, workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error) WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent) StopWorkflow(wfName string, namespace string) error diff --git a/chaoscenter/subscriber/pkg/events/workflow.go b/chaoscenter/subscriber/pkg/events/workflow.go index 6dc0bd8956b..b7c98de453d 100644 --- a/chaoscenter/subscriber/pkg/events/workflow.go +++ b/chaoscenter/subscriber/pkg/events/workflow.go @@ -72,7 +72,7 @@ func (ev *subscriberEvents) startWatchWorkflow(stopCh <-chan struct{}, s cache.S handlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { workflowObj := obj.(*v1alpha1.Workflow) - workflow, err := ev.WorkflowEventHandler(workflowObj, "ADD", startTime) + workflow, err := ev.WorkflowEventHandler(nil, workflowObj, "ADD", startTime) if err != nil { logrus.Error(err) } @@ -81,9 +81,10 @@ func (ev *subscriberEvents) startWatchWorkflow(stopCh <-chan struct{}, s cache.S stream <- workflow }, - UpdateFunc: func(oldObj, obj interface{}) { - workflowObj := obj.(*v1alpha1.Workflow) - workflow, err := ev.WorkflowEventHandler(workflowObj, "UPDATE", startTime) + UpdateFunc: func(oldState, newState interface{}) { + oldObj := oldState.(*v1alpha1.Workflow) + workflowObj := newState.(*v1alpha1.Workflow) + workflow, err := ev.WorkflowEventHandler(oldObj, workflowObj, "UPDATE", startTime) if err != nil { logrus.Error(err) } @@ -97,7 +98,7 @@ func (ev *subscriberEvents) startWatchWorkflow(stopCh <-chan struct{}, s cache.S } // WorkflowEventHandler is responsible for extracting the required data from the event and streaming -func (ev *subscriberEvents) WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) { +func (ev *subscriberEvents) WorkflowEventHandler(oldObj, workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) { if workflowObj.Labels["workflow_id"] == "" { logrus.WithFields(map[string]interface{}{ "uid": string(workflowObj.ObjectMeta.UID), @@ -124,7 +125,7 @@ func (ev *subscriberEvents) WorkflowEventHandler(workflowObj *v1alpha1.Workflow, nodes := make(map[string]types.Node) logrus.Info("Workflow RUN_ID: ", workflowObj.UID, " and event type: ", eventType) - for _, nodeStatus := range workflowObj.Status.Nodes { + for i, nodeStatus := range workflowObj.Status.Nodes { var ( nodeType = string(nodeStatus.Type) @@ -150,9 +151,16 @@ func (ev *subscriberEvents) WorkflowEventHandler(workflowObj *v1alpha1.Workflow, ChaosExp: cd, Message: nodeStatus.Message, } + if nodeType == "ChaosEngine" && cd != nil { - details.Phase = cd.ExperimentStatus + // this happens if cd.ChaosResult == nil + if oldNodeStatus, ok := oldObj.Status.Nodes[i]; ok && oldNodeStatus.Phase == "Pending" && nodeStatus.Phase == "Running" { + details.Phase = "Running" + } else { + details.Phase = cd.ExperimentStatus + } } + nodes[nodeStatus.ID] = details }