Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/subscriber: Interfaces in subscriber #4145

Merged
merged 18 commits into from
Nov 2, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add interfaces to events
Signed-off-by: SohamRatnaparkhi <[email protected]>
SohamRatnaparkhi committed Aug 28, 2023
commit 4a823d0174e95b8c32e6c0e6d55f02d565080be5
13 changes: 6 additions & 7 deletions chaoscenter/subscriber/pkg/events/chaosengine.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"strconv"
"subscriber/pkg/k8s"
"subscriber/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
@@ -21,13 +20,13 @@ import (
)

// ChaosEventWatcher initializes the Litmus ChaosEngine event watcher
func ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
func (ev *events) ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
startTime, err := strconv.Atoi(infraData["START_TIME"])
if err != nil {
logrus.WithError(err).Fatal("failed to parse startTime")
}

cfg, err := k8s.GetKubeConfig()
cfg, err := subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
@@ -87,7 +86,7 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
return
}

cfg, err := k8s.GetKubeConfig()
cfg, err := subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
@@ -155,7 +154,7 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
}

// StopChaosEngineState is used to patch all the chaosEngines with engineState=stop
func StopChaosEngineState(namespace string, workflowRunID *string) error {
func (ev *events) StopChaosEngineState(namespace string, workflowRunID *string) error {
ctx := context.TODO()

//Define the GVR
@@ -166,7 +165,7 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
}

//Generate the dynamic client
_, dynamicClient, err := k8s.GetDynamicAndDiscoveryClient()
_, dynamicClient, err := subscriberK8s.GetDynamicAndDiscoveryClient()
if err != nil {
return errors.New("failed to get dynamic client, error: " + err.Error())
}
@@ -183,7 +182,7 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
return errors.New("failed to list chaosengines: " + err.Error())
}

//Foe every chaosEngine patch the engineState to Stop
//Foe every subscriber patch the engineState to Stop
for _, val := range chaosEngines.Items {
patch := []byte(`{"spec":{"engineState":"stop"}}`)
patched, err := dynamicClient.Resource(resourceType).Namespace(namespace).Patch(ctx, val.GetName(), mergeType.MergePatchType, patch, v1.PatchOptions{})
34 changes: 34 additions & 0 deletions chaoscenter/subscriber/pkg/events/definations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package events

import (
"subscriber/pkg/graphql"
"subscriber/pkg/types"

"subscriber/pkg/k8s"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha12 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1"
)

type SubscriberEvents interface {
ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
StopChaosEngineState(namespace string, workflowRunID *string) error
CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error)
GetWorkflowObj(uid string) (*v1alpha1.Workflow, error)
ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error)
GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error)
WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error)
SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error)
WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent)
}

type events struct{}

func NewChaosEngine() SubscriberEvents {
return &events{}
}

var gqlSubscriberServer = graphql.NewGqlServer()
var subscriberK8s = k8s.NewKubernetes()
15 changes: 7 additions & 8 deletions chaoscenter/subscriber/pkg/events/util.go
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@ import (
"regexp"
"strconv"
"strings"
"subscriber/pkg/k8s"
"subscriber/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
@@ -78,7 +77,7 @@ func getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string,
}

// CheckChaosData util function, checks if event is a chaos-exp event, if so - extract the chaos data
func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error) {
func (ev *events) CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error) {
nodeType := string(nodeStatus.Type)
var cd *types.ChaosData = nil
// considering chaos events has only 1 artifact with manifest as raw data
@@ -91,7 +90,7 @@ func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosCli
if nodeStatus.Phase != "Pending" {
name := obj.GetName()
if obj.GetGenerateName() != "" {
log, err := k8s.GetLogs(nodeStatus.ID, workflowNS, "main")
log, err := subscriberK8s.GetLogs(nodeStatus.ID, workflowNS, "main")
if err != nil {
return nodeType, nil, err
}
@@ -129,9 +128,9 @@ func StrConvTime(time int64) string {
}
}

func GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
func (ev *events) GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
ctx := context.TODO()
conf, err := k8s.GetKubeConfig()
conf, err := subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
@@ -152,9 +151,9 @@ func GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
return nil, nil
}

func ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
func (ev *events) ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
ctx := context.TODO()
conf, err := k8s.GetKubeConfig()
conf, err := subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
@@ -170,7 +169,7 @@ func ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
}

// GenerateWorkflowPayload generate graphql mutation payload for events event
func GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
func (ev *events) GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
infraID := `{infraID: \"` + cid + `\", version: \"` + version + `\", accessKey: \"` + accessKey + `\"}`
for id, event := range wfEvent.Nodes {
event.Message = strings.Replace(event.Message, `"`, ``, -1)
35 changes: 17 additions & 18 deletions chaoscenter/subscriber/pkg/events/workflow.go
Original file line number Diff line number Diff line change
@@ -8,9 +8,6 @@ import (
"strings"
"time"

"subscriber/pkg/graphql"

"subscriber/pkg/k8s"
"subscriber/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
@@ -39,13 +36,15 @@ var (
InfraID = os.Getenv("INFRA_ID")
)

var subscriberEventOperations SubscriberEvents = NewChaosEngine()

// WorkflowEventWatcher initializes the Argo Workflow event watcher
func WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
func (ev *events) WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
startTime, err := strconv.Atoi(infraData["START_TIME"])
if err != nil {
logrus.WithError(err).Fatal("Failed to parse START_TIME")
}
cfg, err := k8s.GetKubeConfig()
cfg, err := subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("Could not get kube config")
}
@@ -68,15 +67,15 @@ func WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent,
informer = f.Argoproj().V1alpha1().Workflows().Informer()
// Start Event Watch
}
go startWatchWorkflow(stopCh, informer, stream, int64(startTime))
go ev.startWatchWorkflow(stopCh, informer, stream, int64(startTime))
}

// handles the different events events - add, update and delete
func startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
func (ev *events) startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
workflowObj := obj.(*v1alpha1.Workflow)
workflow, err := WorkflowEventHandler(workflowObj, "ADD", startTime)
workflow, err := ev.WorkflowEventHandler(workflowObj, "ADD", startTime)
if err != nil {
logrus.Error(err)
}
@@ -87,7 +86,7 @@ func startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, str
},
UpdateFunc: func(oldObj, obj interface{}) {
workflowObj := obj.(*v1alpha1.Workflow)
workflow, err := WorkflowEventHandler(workflowObj, "UPDATE", startTime)
workflow, err := ev.WorkflowEventHandler(workflowObj, "UPDATE", startTime)
if err != nil {
logrus.Error(err)
}
@@ -101,7 +100,7 @@ func startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, str
}

// WorkflowEventHandler is responsible for extracting the required data from the event and streaming
func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) {
func (ev *events) WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) {
if workflowObj.Labels["workflow_id"] == "" {
logrus.WithFields(map[string]interface{}{
"uid": string(workflowObj.ObjectMeta.UID),
@@ -115,7 +114,7 @@ func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, star
return types.WorkflowEvent{}, errors.New("startTime of subscriber is greater than experiment creation timestamp")
}

cfg, err := k8s.GetKubeConfig()
cfg, err := subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("Could not get kube config")
}
@@ -138,7 +137,7 @@ func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, star
// considering chaos events has only 1 artifact with manifest as raw data
if nodeStatus.Type == "Pod" && nodeStatus.Inputs != nil && len(nodeStatus.Inputs.Artifacts) == 1 && nodeStatus.Inputs.Artifacts[0].Raw != nil {
//extracts chaos data
nodeType, cd, err = CheckChaosData(nodeStatus, workflowObj.ObjectMeta.Namespace, chaosClient)
nodeType, cd, err = subscriberEventOperations.CheckChaosData(nodeStatus, workflowObj.ObjectMeta.Namespace, chaosClient)
if err != nil {
logrus.WithError(err).Print("Failed to parse ChaosEngine CRD")
}
@@ -208,7 +207,7 @@ func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, star
}

// SendWorkflowUpdates generates graphql mutation to send events updates to graphql server
func SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error) {
func (ev *events) SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error) {
if wfEvent, ok := eventMap[event.UID]; ok {
for key, node := range wfEvent.Nodes {
if node.Type == "ChaosEngine" && node.ChaosExp != nil && event.Nodes[key].ChaosExp == nil {
@@ -230,28 +229,28 @@ func SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent)
eventMap[event.UID] = event

// generate graphql payload
payload, err := GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "false", event)
payload, err := subscriberEventOperations.GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "false", event)
if err != nil {
return "", errors.New("Error while generating graphql payload from the workflow event" + err.Error())
}

if event.FinishedAt != "" {
payload, err = GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "true", event)
payload, err = subscriberEventOperations.GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "true", event)
delete(eventMap, event.UID)
}

body, err := graphql.SendRequest(infraData["SERVER_ADDR"], payload)
body, err := gqlSubscriberServer.SendRequest(infraData["SERVER_ADDR"], payload)
if err != nil {
return "", err
}

return body, nil
}

func WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent) {
func (ev *events) WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent) {
// listen on the channel for streaming event updates
for eventData := range event {
response, err := SendWorkflowUpdates(infraData, eventData)
response, err := ev.SendWorkflowUpdates(infraData, eventData)
if err != nil {
logrus.Print(err.Error())
}