Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/subscriber: Interfaces in subscriber #4145

Merged
merged 18 commits into from
Nov 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions chaoscenter/subscriber/pkg/events/chaosengine.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"strconv"
"subscriber/pkg/k8s"
"subscriber/pkg/types"

wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
@@ -23,13 +22,13 @@ import (
)

// ChaosEventWatcher initializes the Litmus ChaosEngine event watcher
func ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
func (ev *subscriberEvents) ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
startTime, err := strconv.Atoi(infraData["START_TIME"])
if err != nil {
logrus.WithError(err).Fatal("failed to parse startTime")
}

cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
@@ -55,17 +54,17 @@ func ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, in
informer = f.Litmuschaos().V1alpha1().ChaosEngines().Informer()
}

go startWatchEngine(stopCh, informer, stream, int64(startTime))
go ev.startWatchEngine(stopCh, informer, stream, int64(startTime))
}

// handles the different events events - add, update and delete
func startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
// handles the different*subscriberEvents - add, update and delete
func (ev *subscriberEvents) startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
chaosEventHandler(obj, "ADD", stream, startTime)
ev.chaosEventHandler(obj, "ADD", stream, startTime)
},
UpdateFunc: func(oldObj, obj interface{}) {
chaosEventHandler(obj, "UPDATE", stream, startTime)
ev.chaosEventHandler(obj, "UPDATE", stream, startTime)
},
}

@@ -74,7 +73,7 @@ func startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, strea
}

// responsible for extracting the required data from the event and streaming
func chaosEventHandler(obj interface{}, eventType string, stream chan types.WorkflowEvent, startTime int64) {
func (ev *subscriberEvents) chaosEventHandler(obj interface{}, eventType string, stream chan types.WorkflowEvent, startTime int64) {
workflowObj := obj.(*chaosTypes.ChaosEngine)
if workflowObj.Labels["workflow_id"] == "" {
logrus.WithFields(map[string]interface{}{
@@ -89,7 +88,7 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
return
}

cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
@@ -104,12 +103,12 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
var cd *types.ChaosData = nil

//extracts chaos data
cd, err = getChaosData(v1alpha1.NodeStatus{StartedAt: workflowObj.ObjectMeta.CreationTimestamp}, workflowObj.Name, workflowObj.Namespace, chaosClient)
cd, err = ev.getChaosData(v1alpha1.NodeStatus{StartedAt: workflowObj.ObjectMeta.CreationTimestamp}, workflowObj.Name, workflowObj.Namespace, chaosClient)
if err != nil {
logrus.WithError(err).Print("FAILED PARSING CHAOS ENGINE CRD")
}

// considering chaos events has only 1 artifact with manifest as raw data
// considering chaos*subscriberEvents has only 1 artifact with manifest as raw data
finTime := int64(-1)
if workflowObj.Status.EngineStatus == chaosTypes.EngineStatusCompleted || workflowObj.Status.EngineStatus == chaosTypes.EngineStatusStopped {
if len(workflowObj.Status.Experiments) > 0 {
@@ -157,7 +156,7 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
}

// StopChaosEngineState is used to patch all the chaosEngines with engineState=stop
func StopChaosEngineState(namespace string, workflowRunID *string) error {
func (ev *subscriberEvents) StopChaosEngineState(namespace string, workflowRunID *string) error {
ctx := context.TODO()

//Define the GVR
@@ -168,7 +167,7 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
}

//Generate the dynamic client
_, dynamicClient, err := k8s.GetDynamicAndDiscoveryClient()
_, dynamicClient, err := ev.subscriberK8s.GetDynamicAndDiscoveryClient()
if err != nil {
return errors.New("failed to get dynamic client, error: " + err.Error())
}
@@ -185,7 +184,7 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
return errors.New("failed to list chaosengines: " + err.Error())
}

//Foe every chaosEngine patch the engineState to Stop
//Foe every subscriber patch the engineState to Stop
for _, val := range chaosEngines.Items {
patch := []byte(`{"spec":{"engineState":"stop"}}`)
patched, err := dynamicClient.Resource(resourceType).Namespace(namespace).Patch(ctx, val.GetName(), mergeType.MergePatchType, patch, v1.PatchOptions{})
@@ -200,9 +199,9 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
}

// StopWorkflow will patch the workflow based on workflow name using the shutdown strategy
func StopWorkflow(wfName string, namespace string) error {
func (ev *subscriberEvents) StopWorkflow(wfName string, namespace string) error {

conf, err := k8s.GetKubeConfig()
conf, err := ev.subscriberK8s.GetKubeConfig()
wfClient := wfclientset.NewForConfigOrDie(conf).ArgoprojV1alpha1().Workflows(namespace)
patch := []byte(`{"spec":{"shutdown":"Stop"}}`)
wf, err := wfClient.Patch(context.TODO(), wfName, mergeType.MergePatchType, patch, v1.PatchOptions{})
38 changes: 38 additions & 0 deletions chaoscenter/subscriber/pkg/events/definations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package events

import (
"subscriber/pkg/graphql"
"subscriber/pkg/types"

"subscriber/pkg/k8s"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha12 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1"
)

type SubscriberEvents interface {
ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
StopChaosEngineState(namespace string, workflowRunID *string) error
CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error)
GetWorkflowObj(uid string) (*v1alpha1.Workflow, error)
ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error)
GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error)
WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error)
SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error)
WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent)
StopWorkflow(wfName string, namespace string) error
}

type subscriberEvents struct {
gqlSubscriberServer graphql.SubscriberGql
subscriberK8s k8s.SubscriberK8s
}

func NewSubscriberEventsOperator(gqlSubscriberServer graphql.SubscriberGql, subscriberK8s k8s.SubscriberK8s) SubscriberEvents {
return &subscriberEvents{
gqlSubscriberServer: gqlSubscriberServer,
subscriberK8s: subscriberK8s,
}
}
19 changes: 9 additions & 10 deletions chaoscenter/subscriber/pkg/events/util.go
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@ import (
"regexp"
"strconv"
"strings"
"subscriber/pkg/k8s"
"subscriber/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
@@ -24,7 +23,7 @@ import (
)

// util function, extracts the chaos data using the litmus go-client
func getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (*types.ChaosData, error) {
func (ev *subscriberEvents) getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (*types.ChaosData, error) {
cd := &types.ChaosData{}
cd.EngineName = engineName
cd.Namespace = engineNS
@@ -79,7 +78,7 @@ func getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string,
}

// CheckChaosData util function, checks if event is a chaos-exp event, if so - extract the chaos data
func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error) {
func (ev *subscriberEvents) CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error) {
nodeType := string(nodeStatus.Type)
var cd *types.ChaosData = nil
// considering chaos events has only 1 artifact with manifest as raw data
@@ -92,7 +91,7 @@ func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosCli
if nodeStatus.Phase != "Pending" {
name := obj.GetName()
if obj.GetGenerateName() != "" {
log, err := k8s.GetLogs(nodeStatus.ID, workflowNS, "main")
log, err := ev.subscriberK8s.GetLogs(nodeStatus.ID, workflowNS, "main")
if err != nil {
return nodeType, nil, err
}
@@ -101,7 +100,7 @@ func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosCli
return nodeType, nil, errors.New("Chaos-Engine Generated Name couldn't be retrieved")
}
}
cd, err = getChaosData(nodeStatus, name, obj.GetNamespace(), chaosClient)
cd, err = ev.getChaosData(nodeStatus, name, obj.GetNamespace(), chaosClient)
return nodeType, cd, err
}
}
@@ -130,9 +129,9 @@ func StrConvTime(time int64) string {
}
}

func GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
func (ev *subscriberEvents) GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
ctx := context.TODO()
conf, err := k8s.GetKubeConfig()
conf, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
@@ -153,9 +152,9 @@ func GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
return nil, nil
}

func ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
func (ev *subscriberEvents) ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
ctx := context.TODO()
conf, err := k8s.GetKubeConfig()
conf, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
@@ -171,7 +170,7 @@ func ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
}

// GenerateWorkflowPayload generate graphql mutation payload for events event
func GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
func (ev *subscriberEvents) GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
infraID := `{infraID: \"` + cid + `\", version: \"` + version + `\", accessKey: \"` + accessKey + `\"}`
for id, event := range wfEvent.Nodes {
event.Message = strings.Replace(event.Message, `"`, ``, -1)
33 changes: 15 additions & 18 deletions chaoscenter/subscriber/pkg/events/workflow.go
Original file line number Diff line number Diff line change
@@ -7,9 +7,6 @@ import (
"strconv"
"time"

"subscriber/pkg/graphql"

"subscriber/pkg/k8s"
"subscriber/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
@@ -39,12 +36,12 @@ var (
)

// WorkflowEventWatcher initializes the Argo Workflow event watcher
func WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
func (ev *subscriberEvents) WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
startTime, err := strconv.Atoi(infraData["START_TIME"])
if err != nil {
logrus.WithError(err).Fatal("Failed to parse START_TIME")
}
cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("Could not get kube config")
}
@@ -67,15 +64,15 @@ func WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent,
informer = f.Argoproj().V1alpha1().Workflows().Informer()
// Start Event Watch
}
go startWatchWorkflow(stopCh, informer, stream, int64(startTime))
go ev.startWatchWorkflow(stopCh, informer, stream, int64(startTime))
}

// handles the different events events - add, update and delete
func startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
func (ev *subscriberEvents) startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
workflowObj := obj.(*v1alpha1.Workflow)
workflow, err := WorkflowEventHandler(workflowObj, "ADD", startTime)
workflow, err := ev.WorkflowEventHandler(workflowObj, "ADD", startTime)
if err != nil {
logrus.Error(err)
}
@@ -86,7 +83,7 @@ func startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, str
},
UpdateFunc: func(oldObj, obj interface{}) {
workflowObj := obj.(*v1alpha1.Workflow)
workflow, err := WorkflowEventHandler(workflowObj, "UPDATE", startTime)
workflow, err := ev.WorkflowEventHandler(workflowObj, "UPDATE", startTime)
if err != nil {
logrus.Error(err)
}
@@ -100,7 +97,7 @@ func startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, str
}

// WorkflowEventHandler is responsible for extracting the required data from the event and streaming
func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) {
func (ev *subscriberEvents) WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) {
if workflowObj.Labels["workflow_id"] == "" {
logrus.WithFields(map[string]interface{}{
"uid": string(workflowObj.ObjectMeta.UID),
@@ -114,7 +111,7 @@ func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, star
return types.WorkflowEvent{}, errors.New("startTime of subscriber is greater than experiment creation timestamp")
}

cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("Could not get kube config")
}
@@ -137,7 +134,7 @@ func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, star
// considering chaos events has only 1 artifact with manifest as raw data
if nodeStatus.Type == "Pod" && nodeStatus.Inputs != nil && len(nodeStatus.Inputs.Artifacts) == 1 && nodeStatus.Inputs.Artifacts[0].Raw != nil {
//extracts chaos data
nodeType, cd, err = CheckChaosData(nodeStatus, workflowObj.ObjectMeta.Namespace, chaosClient)
nodeType, cd, err = ev.CheckChaosData(nodeStatus, workflowObj.ObjectMeta.Namespace, chaosClient)
if err != nil {
logrus.WithError(err).Print("Failed to parse ChaosEngine CRD")
}
@@ -202,7 +199,7 @@ func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, star
}

// SendWorkflowUpdates generates graphql mutation to send events updates to graphql server
func SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error) {
func (ev *subscriberEvents) SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error) {
if wfEvent, ok := eventMap[event.UID]; ok {
for key, node := range wfEvent.Nodes {
if node.Type == "ChaosEngine" && node.ChaosExp != nil && event.Nodes[key].ChaosExp == nil {
@@ -230,28 +227,28 @@ func SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent)
eventMap[event.UID] = event

// generate graphql payload
payload, err := GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "false", event)
payload, err := ev.GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "false", event)
if err != nil {
return "", errors.New("Error while generating graphql payload from the workflow event" + err.Error())
}

if event.FinishedAt != "" {
payload, err = GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "true", event)
payload, err = ev.GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "true", event)
delete(eventMap, event.UID)
}

body, err := graphql.SendRequest(infraData["SERVER_ADDR"], payload)
body, err := ev.gqlSubscriberServer.SendRequest(infraData["SERVER_ADDR"], payload)
if err != nil {
return "", err
}

return body, nil
}

func WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent) {
func (ev *subscriberEvents) WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent) {
// listen on the channel for streaming event updates
for eventData := range event {
response, err := SendWorkflowUpdates(infraData, eventData)
response, err := ev.SendWorkflowUpdates(infraData, eventData)
if err != nil {
logrus.Print(err.Error())
}
13 changes: 13 additions & 0 deletions chaoscenter/subscriber/pkg/graphql/definations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package graphql

type SubscriberGql interface {
SendRequest(server string, payload []byte) (string, error)
MarshalGQLData(gqlData interface{}) (string, error)
}

type subscriberGql struct {
}

func NewSubscriberGql() SubscriberGql {
return &subscriberGql{}
}
4 changes: 2 additions & 2 deletions chaoscenter/subscriber/pkg/graphql/operations.go
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ import (
"strings"
)

func SendRequest(server string, payload []byte) (string, error) {
func (gql *subscriberGql) SendRequest(server string, payload []byte) (string, error) {
req, err := http.NewRequest("POST", server, bytes.NewBuffer(payload))
if err != nil {
return "", err
@@ -30,7 +30,7 @@ func SendRequest(server string, payload []byte) (string, error) {
}

// MarshalGQLData processes event data into proper format acceptable by graphql
func MarshalGQLData(gqlData interface{}) (string, error) {
func (gql *subscriberGql) MarshalGQLData(gqlData interface{}) (string, error) {
data, err := json.Marshal(gqlData)
if err != nil {
return "", err
14 changes: 7 additions & 7 deletions chaoscenter/subscriber/pkg/k8s/client.go
Original file line number Diff line number Diff line change
@@ -13,16 +13,16 @@ import (
var KubeConfig *string

// getKubeConfig setup the config for access cluster resource
func GetKubeConfig() (*rest.Config, error) {
func (k8s *k8sSubscriber) GetKubeConfig() (*rest.Config, error) {
// Use in-cluster config if kubeconfig path is not specified
if *KubeConfig == "" {
return rest.InClusterConfig()
}
return clientcmd.BuildConfigFromFlags("", *KubeConfig)
}

func GetGenericK8sClient() (*kubernetes.Clientset, error) {
config, err := GetKubeConfig()
func (k8s *k8sSubscriber) GetGenericK8sClient() (*kubernetes.Clientset, error) {
config, err := k8s.GetKubeConfig()
if err != nil {
return nil, err
}
@@ -31,9 +31,9 @@ func GetGenericK8sClient() (*kubernetes.Clientset, error) {
}

// This function returns dynamic client and discovery client
func GetDynamicAndDiscoveryClient() (discovery.DiscoveryInterface, dynamic.Interface, error) {
func (k8s *k8sSubscriber) GetDynamicAndDiscoveryClient() (discovery.DiscoveryInterface, dynamic.Interface, error) {
// returns a config object which uses the service account kubernetes gives to pods
config, err := GetKubeConfig()
config, err := k8s.GetKubeConfig()
if err != nil {
return nil, nil, err
}
@@ -53,10 +53,10 @@ func GetDynamicAndDiscoveryClient() (discovery.DiscoveryInterface, dynamic.Inter
return discoveryClient, dynamicClient, nil
}

func GenerateArgoClient(namespace string) (v1alpha12.WorkflowInterface, error) {
func (k8s *k8sSubscriber) GenerateArgoClient(namespace string) (v1alpha12.WorkflowInterface, error) {

//List all chaosEngines present in the particular namespace
conf, err := GetKubeConfig()
conf, err := k8s.GetKubeConfig()
if err != nil {
return nil, err
}
44 changes: 44 additions & 0 deletions chaoscenter/subscriber/pkg/k8s/defination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package k8s

import (
"subscriber/pkg/graphql"
"subscriber/pkg/types"

v1alpha12 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

type SubscriberK8s interface {
GetLogs(podName, namespace, container string) (string, error)
CreatePodLog(podLog types.PodLogRequest) (types.PodLog, error)
SendPodLogs(infraData map[string]string, podLog types.PodLogRequest)
GenerateLogPayload(cid, accessKey, version string, podLog types.PodLogRequest) ([]byte, error)
GetKubernetesObjects(request types.KubeObjRequest) ([]*types.KubeObject, error)
GetObjectDataByNamespace(namespace string, dynamicClient dynamic.Interface, resourceType schema.GroupVersionResource) ([]types.ObjectData, error)
GenerateKubeObject(cid string, accessKey, version string, kubeobjectrequest types.KubeObjRequest) ([]byte, error)
SendKubeObjects(infraData map[string]string, kubeobjectrequest types.KubeObjRequest) error
CheckComponentStatus(componentEnv string) error
IsAgentConfirmed() (bool, string, error)
AgentRegister(infraData map[string]string) (bool, error)
AgentOperations(infraAction types.Action) (*unstructured.Unstructured, error)
AgentConfirm(infraData map[string]string) ([]byte, error)
GetKubeConfig() (*rest.Config, error)
GetGenericK8sClient() (*kubernetes.Clientset, error)
GetDynamicAndDiscoveryClient() (discovery.DiscoveryInterface, dynamic.Interface, error)
GenerateArgoClient(namespace string) (v1alpha12.WorkflowInterface, error)
}

type k8sSubscriber struct {
gqlSubscriberServer graphql.SubscriberGql
}

func NewK8sSubscriber(gqlSubscriberServer graphql.SubscriberGql) SubscriberK8s {
return &k8sSubscriber{
gqlSubscriberServer: gqlSubscriberServer,
}
}
25 changes: 12 additions & 13 deletions chaoscenter/subscriber/pkg/k8s/log.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ import (
"strconv"
"strings"

"subscriber/pkg/graphql"
"subscriber/pkg/types"

"github.com/sirupsen/logrus"
@@ -16,9 +15,9 @@ import (
"k8s.io/client-go/kubernetes"
)

func GetLogs(podName, namespace, container string) (string, error) {
func (k8s *k8sSubscriber) GetLogs(podName, namespace, container string) (string, error) {
ctx := context.TODO()
conf, err := GetKubeConfig()
conf, err := k8s.GetKubeConfig()
if err != nil {
return "", err
}
@@ -54,9 +53,9 @@ func GetLogs(podName, namespace, container string) (string, error) {
}

// create pod log for normal pods and chaos-engine pods
func CreatePodLog(podLog types.PodLogRequest) (types.PodLog, error) {
func (k8s *k8sSubscriber) CreatePodLog(podLog types.PodLogRequest) (types.PodLog, error) {
logDetails := types.PodLog{}
mainLog, err := GetLogs(podLog.PodName, podLog.PodNamespace, "main")
mainLog, err := k8s.GetLogs(podLog.PodName, podLog.PodNamespace, "main")
// try getting argo pod logs
if err != nil {
logrus.Errorf("Failed to get argo pod %v logs, err: %v", podLog.PodName, err)
@@ -69,7 +68,7 @@ func CreatePodLog(podLog types.PodLogRequest) (types.PodLog, error) {
if strings.ToLower(podLog.PodType) == "chaosengine" && podLog.ChaosNamespace != nil {
chaosLog := make(map[string]string)
if podLog.ExpPod != nil {
expLog, err := GetLogs(*podLog.ExpPod, *podLog.ChaosNamespace, "")
expLog, err := k8s.GetLogs(*podLog.ExpPod, *podLog.ChaosNamespace, "")
if err == nil {
chaosLog[*podLog.ExpPod] = strconv.Quote(strings.Replace(expLog, `"`, `'`, -1))
chaosLog[*podLog.ExpPod] = chaosLog[*podLog.ExpPod][1 : len(chaosLog[*podLog.ExpPod])-1]
@@ -78,7 +77,7 @@ func CreatePodLog(podLog types.PodLogRequest) (types.PodLog, error) {
}
}
if podLog.RunnerPod != nil {
runnerLog, err := GetLogs(*podLog.RunnerPod, *podLog.ChaosNamespace, "")
runnerLog, err := k8s.GetLogs(*podLog.RunnerPod, *podLog.ChaosNamespace, "")
if err == nil {
chaosLog[*podLog.RunnerPod] = strconv.Quote(strings.Replace(runnerLog, `"`, `'`, -1))
chaosLog[*podLog.RunnerPod] = chaosLog[*podLog.RunnerPod][1 : len(chaosLog[*podLog.RunnerPod])-1]
@@ -96,28 +95,28 @@ func CreatePodLog(podLog types.PodLogRequest) (types.PodLog, error) {
}

// SendPodLogs generates graphql mutation to send events updates to graphql server
func SendPodLogs(infraData map[string]string, podLog types.PodLogRequest) {
func (k8s *k8sSubscriber) SendPodLogs(infraData map[string]string, podLog types.PodLogRequest) {
// generate graphql payload
payload, err := GenerateLogPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], podLog)
payload, err := k8s.GenerateLogPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], podLog)
if err != nil {
logrus.WithError(err).Print("Error while retrieving the workflow logs")
}
body, err := graphql.SendRequest(infraData["SERVER_ADDR"], payload)
body, err := k8s.gqlSubscriberServer.SendRequest(infraData["SERVER_ADDR"], payload)
if err != nil {
logrus.Print(err.Error())
}
logrus.Print("Response from the server: ", body)
}

func GenerateLogPayload(cid, accessKey, version string, podLog types.PodLogRequest) ([]byte, error) {
func (k8s *k8sSubscriber) GenerateLogPayload(cid, accessKey, version string, podLog types.PodLogRequest) ([]byte, error) {
infraID := `{infraID: \"` + cid + `\", version: \"` + version + `\", accessKey: \"` + accessKey + `\"}`
processed := " Could not get logs "

// get the logs
logDetails, err := CreatePodLog(podLog)
logDetails, err := k8s.CreatePodLog(podLog)
if err == nil {
// process log data
processed, err = graphql.MarshalGQLData(logDetails)
processed, err = k8s.gqlSubscriberServer.MarshalGQLData(logDetails)
if err != nil {
processed = " Could not get logs "
}
30 changes: 14 additions & 16 deletions chaoscenter/subscriber/pkg/k8s/objects.go
Original file line number Diff line number Diff line change
@@ -8,8 +8,6 @@ import (
"os"
"strings"

"subscriber/pkg/graphql"

"github.com/sirupsen/logrus"

"subscriber/pkg/types"
@@ -26,8 +24,8 @@ var (
)

// GetKubernetesObjects is used to get the Kubernetes Object details according to the request type
func GetKubernetesObjects(request types.KubeObjRequest) ([]*types.KubeObject, error) {
conf, err := GetKubeConfig()
func (k8s *k8sSubscriber) GetKubernetesObjects(request types.KubeObjRequest) ([]*types.KubeObject, error) {
conf, err := k8s.GetKubeConfig()
if err != nil {
return nil, err
}
@@ -41,14 +39,14 @@ func GetKubernetesObjects(request types.KubeObjRequest) ([]*types.KubeObject, er
Version: request.KubeGVRRequest.Version,
Resource: request.KubeGVRRequest.Resource,
}
_, dynamicClient, err := GetDynamicAndDiscoveryClient()
_, dynamicClient, err := k8s.GetDynamicAndDiscoveryClient()
if err != nil {
return nil, err
}
var ObjData []*types.KubeObject

if strings.ToLower(InfraScope) == "namespace" {
dataList, err := GetObjectDataByNamespace(InfraNamespace, dynamicClient, resourceType)
dataList, err := k8s.GetObjectDataByNamespace(InfraNamespace, dynamicClient, resourceType)
if err != nil {
return nil, err
}
@@ -65,7 +63,7 @@ func GetKubernetesObjects(request types.KubeObjRequest) ([]*types.KubeObject, er

if len(namespace.Items) > 0 {
for _, namespace := range namespace.Items {
podList, err := GetObjectDataByNamespace(namespace.GetName(), dynamicClient, resourceType)
podList, err := k8s.GetObjectDataByNamespace(namespace.GetName(), dynamicClient, resourceType)
if err != nil {
return nil, err
}
@@ -90,7 +88,7 @@ func GetKubernetesObjects(request types.KubeObjRequest) ([]*types.KubeObject, er
}

// GetObjectDataByNamespace uses dynamic client to fetch Kubernetes Objects data.
func GetObjectDataByNamespace(namespace string, dynamicClient dynamic.Interface, resourceType schema.GroupVersionResource) ([]types.ObjectData, error) {
func (k8s *k8sSubscriber) GetObjectDataByNamespace(namespace string, dynamicClient dynamic.Interface, resourceType schema.GroupVersionResource) ([]types.ObjectData, error) {
list, err := dynamicClient.Resource(resourceType).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
var kubeObjects []types.ObjectData
if err != nil {
@@ -104,14 +102,14 @@ func GetObjectDataByNamespace(namespace string, dynamicClient dynamic.Interface,
APIVersion: list.GetAPIVersion(),
CreationTimestamp: list.GetCreationTimestamp(),
TerminationGracePeriods: list.GetDeletionGracePeriodSeconds(),
Labels: updateLabels(list.GetLabels()),
Labels: k8s.updateLabels(list.GetLabels()),
}
kubeObjects = append(kubeObjects, listInfo)
}
return kubeObjects, nil
}

func updateLabels(labels map[string]string) []string {
func (k8s *k8sSubscriber) updateLabels(labels map[string]string) []string {
var updatedLabels []string

for k, v := range labels {
@@ -120,13 +118,13 @@ func updateLabels(labels map[string]string) []string {
return updatedLabels
}

func GenerateKubeObject(cid string, accessKey, version string, kubeobjectrequest types.KubeObjRequest) ([]byte, error) {
func (k8s *k8sSubscriber) GenerateKubeObject(cid string, accessKey, version string, kubeobjectrequest types.KubeObjRequest) ([]byte, error) {
infraID := `{infraID: \"` + cid + `\", version: \"` + version + `\", accessKey: \"` + accessKey + `\"}`
kubeObj, err := GetKubernetesObjects(kubeobjectrequest)
kubeObj, err := k8s.GetKubernetesObjects(kubeobjectrequest)
if err != nil {
return nil, err
}
processed, err := graphql.MarshalGQLData(kubeObj)
processed, err := k8s.gqlSubscriberServer.MarshalGQLData(kubeObj)
if err != nil {
return nil, err
}
@@ -137,15 +135,15 @@ func GenerateKubeObject(cid string, accessKey, version string, kubeobjectrequest
}

// SendKubeObjects generates graphql mutation to send kubernetes objects data to graphql server
func SendKubeObjects(infraData map[string]string, kubeobjectrequest types.KubeObjRequest) error {
func (k8s *k8sSubscriber) SendKubeObjects(infraData map[string]string, kubeobjectrequest types.KubeObjRequest) error {
// generate graphql payload
payload, err := GenerateKubeObject(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], kubeobjectrequest)
payload, err := k8s.GenerateKubeObject(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], kubeobjectrequest)
if err != nil {
logrus.WithError(err).Print("Error while getting KubeObject Data")
return err
}

body, err := graphql.SendRequest(infraData["SERVER_ADDR"], payload)
body, err := k8s.gqlSubscriberServer.SendRequest(infraData["SERVER_ADDR"], payload)
if err != nil {
logrus.Print(err.Error())
return err
25 changes: 12 additions & 13 deletions chaoscenter/subscriber/pkg/k8s/operations.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@ import (

"k8s.io/apimachinery/pkg/labels"

"subscriber/pkg/graphql"
"subscriber/pkg/types"

yaml_converter "github.com/ghodss/yaml"
@@ -47,12 +46,12 @@ var (
dr dynamic.ResourceInterface
)

func CheckComponentStatus(componentEnv string) error {
func (k8s *k8sSubscriber) CheckComponentStatus(componentEnv string) error {
if componentEnv == "" {
return errors.New("components not found in infra config")
}

clientset, err := GetGenericK8sClient()
clientset, err := k8s.GetGenericK8sClient()
if err != nil {
return err
}
@@ -71,7 +70,7 @@ func CheckComponentStatus(componentEnv string) error {

// add all agent components to waitgroup
wait.Add(1)
go checkDeploymentStatus(&components, clientset, &wait)
go k8s.checkDeploymentStatus(&components, clientset, &wait)

wait.Wait()
if !components.LiveStatus {
@@ -80,7 +79,7 @@ func CheckComponentStatus(componentEnv string) error {
return nil
}

func checkDeploymentStatus(components *InfraComponents, clientset *kubernetes.Clientset, wait *sync.WaitGroup) {
func (k8s *k8sSubscriber) checkDeploymentStatus(components *InfraComponents, clientset *kubernetes.Clientset, wait *sync.WaitGroup) {
ctx := context.TODO()
downCount := 0
retries := 0
@@ -128,8 +127,8 @@ func checkDeploymentStatus(components *InfraComponents, clientset *kubernetes.Cl
components.LiveStatus = false
}

func IsAgentConfirmed() (bool, string, error) {
clientset, err := GetGenericK8sClient()
func (k8s *k8sSubscriber) IsAgentConfirmed() (bool, string, error) {
clientset, err := k8s.GetGenericK8sClient()
if err != nil {
return false, "", err
}
@@ -156,8 +155,8 @@ func IsAgentConfirmed() (bool, string, error) {
}

// AgentRegister function creates litmus-portal config map in the litmus namespace
func AgentRegister(infraData map[string]string) (bool, error) {
clientset, err := GetGenericK8sClient()
func (k8s *k8sSubscriber) AgentRegister(infraData map[string]string) (bool, error) {
clientset, err := k8s.GetGenericK8sClient()
if err != nil {
return false, err
}
@@ -306,15 +305,15 @@ func addCustomLabels(obj *unstructured.Unstructured, customLabels map[string]str
}

// AgentOperations This function handles agent operations
func AgentOperations(infraAction types.Action) (*unstructured.Unstructured, error) {
func (k8s *k8sSubscriber) AgentOperations(infraAction types.Action) (*unstructured.Unstructured, error) {
// Converting JSON to YAML and store it in yamlStr variable
yamlStr, err := yaml_converter.JSONToYAML([]byte(infraAction.K8SManifest))
if err != nil {
return nil, err
}

// Getting dynamic and discovery client
discoveryClient, dynamicClient, err := GetDynamicAndDiscoveryClient()
discoveryClient, dynamicClient, err := k8s.GetDynamicAndDiscoveryClient()
if err != nil {
return nil, err
}
@@ -350,9 +349,9 @@ func AgentOperations(infraAction types.Action) (*unstructured.Unstructured, erro
return applyRequest(infraAction.RequestType, obj)
}

func AgentConfirm(infraData map[string]string) ([]byte, error) {
func (k8s *k8sSubscriber) AgentConfirm(infraData map[string]string) ([]byte, error) {
payload := `{"query":"mutation{ confirmInfraRegistration(request: {infraID: \"` + infraData["INFRA_ID"] + `\", version: \"` + infraData["VERSION"] + `\", accessKey: \"` + infraData["ACCESS_KEY"] + `\"}){isInfraConfirmed newAccessKey infraID}}"}`
resp, err := graphql.SendRequest(infraData["SERVER_ADDR"], []byte(payload))
resp, err := k8s.gqlSubscriberServer.SendRequest(infraData["SERVER_ADDR"], []byte(payload))
if err != nil {
return nil, err
}
24 changes: 24 additions & 0 deletions chaoscenter/subscriber/pkg/requests/definations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package requests

import (
"subscriber/pkg/k8s"
"subscriber/pkg/types"
"subscriber/pkg/utils"
)

type SubscriberRequests interface {
AgentConnect(infraData map[string]string)
RequestProcessor(infraData map[string]string, r types.RawData) error
}

type subscriberRequests struct {
subscriberK8s k8s.SubscriberK8s
subscriberUtils utils.SubscriberUtils
}

func NewSubscriberRequests(subscriberK8s k8s.SubscriberK8s, subscriberUtils utils.SubscriberUtils) SubscriberRequests {
return &subscriberRequests{
subscriberK8s: subscriberK8s,
subscriberUtils: subscriberUtils,
}
}
16 changes: 7 additions & 9 deletions chaoscenter/subscriber/pkg/requests/webhook.go
Original file line number Diff line number Diff line change
@@ -7,15 +7,13 @@ import (
"strings"
"time"

"subscriber/pkg/k8s"
"subscriber/pkg/types"
"subscriber/pkg/utils"

"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)

func AgentConnect(infraData map[string]string) {
func (req *subscriberRequests) AgentConnect(infraData map[string]string) {
query := `{"query":"subscription {\n infraConnect(request: {infraID: \"` + infraData["INFRA_ID"] + `\", version: \"` + infraData["VERSION"] + `\", accessKey: \"` + infraData["ACCESS_KEY"] + `\"}) {\n action{\n k8sManifest,\n externalData,\n requestID\n requestType\n username\n namespace\n }\n }\n}\n"}`
serverURL, err := url.Parse(infraData["SERVER_ADDR"])
if err != nil {
@@ -92,14 +90,14 @@ func AgentConnect(infraData map[string]string) {
continue
}

err = RequestProcessor(infraData, r)
err = req.RequestProcessor(infraData, r)
if err != nil {
logrus.WithError(err).Error("Error on processing request")
}
}
}

func RequestProcessor(infraData map[string]string, r types.RawData) error {
func (req *subscriberRequests) RequestProcessor(infraData map[string]string, r types.RawData) error {
if strings.Index("kubeobject kubeobjects", strings.ToLower(r.Payload.Data.InfraConnect.Action.RequestType)) >= 0 {
KubeObjRequest := types.KubeObjRequest{
RequestID: r.Payload.Data.InfraConnect.Action.RequestID,
@@ -110,7 +108,7 @@ func RequestProcessor(infraData map[string]string, r types.RawData) error {
return errors.New("failed to json unmarshal: " + err.Error())
}

err = k8s.SendKubeObjects(infraData, KubeObjRequest)
err = req.subscriberK8s.SendKubeObjects(infraData, KubeObjRequest)
if err != nil {
return errors.New("error getting kubernetes object data: " + err.Error())
}
@@ -125,15 +123,15 @@ func RequestProcessor(infraData map[string]string, r types.RawData) error {
}

logrus.Print("Log Request: ", r.Payload.Data.InfraConnect.Action.ExternalData)
k8s.SendPodLogs(infraData, podRequest)
req.subscriberK8s.SendPodLogs(infraData, podRequest)
} else if strings.Index("create update delete get", strings.ToLower(r.Payload.Data.InfraConnect.Action.RequestType)) >= 0 {
_, err := k8s.AgentOperations(r.Payload.Data.InfraConnect.Action)
_, err := req.subscriberK8s.AgentOperations(r.Payload.Data.InfraConnect.Action)
if err != nil {
return errors.New("error performing infra operation: " + err.Error())
}
} else if strings.Index("workflow_delete workflow_run_delete workflow_run_stop ", strings.ToLower(r.Payload.Data.InfraConnect.Action.RequestType)) >= 0 {

err := utils.WorkflowRequest(infraData, r.Payload.Data.InfraConnect.Action.RequestType, r.Payload.Data.InfraConnect.Action.ExternalData, r.Payload.Data.InfraConnect.Action.Username)
err := req.subscriberUtils.WorkflowRequest(infraData, r.Payload.Data.InfraConnect.Action.RequestType, r.Payload.Data.InfraConnect.Action.ExternalData, r.Payload.Data.InfraConnect.Action.Username)
if err != nil {
return errors.New("error performing events operation: " + err.Error())
}
23 changes: 23 additions & 0 deletions chaoscenter/subscriber/pkg/utils/definations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package utils

import (
"subscriber/pkg/events"
"subscriber/pkg/k8s"
)

type SubscriberUtils interface {
WorkflowRequest(agentData map[string]string, requestType string, externalData string, uuid string) error
DeleteWorkflow(wfname string, agentData map[string]string) error
}

type subscriberUtils struct {
subscriberEventOperations events.SubscriberEvents
subscriberK8s k8s.SubscriberK8s
}

func NewSubscriberUtils(subscriberEventOperations events.SubscriberEvents, subscriberK8s k8s.SubscriberK8s) SubscriberUtils {
return &subscriberUtils{
subscriberEventOperations: subscriberEventOperations,
subscriberK8s: subscriberK8s,
}
}
24 changes: 11 additions & 13 deletions chaoscenter/subscriber/pkg/utils/workflow.go
Original file line number Diff line number Diff line change
@@ -2,54 +2,52 @@ package utils

import (
"context"
"subscriber/pkg/events"
"subscriber/pkg/k8s"

wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func WorkflowRequest(agentData map[string]string, requestType string, externalData string, uuid string) error {
func (utils *subscriberUtils) WorkflowRequest(agentData map[string]string, requestType string, externalData string, uuid string) error {
if requestType == "workflow_delete" {
wfOb, err := events.ListWorkflowObject(externalData)
wfOb, err := utils.subscriberEventOperations.ListWorkflowObject(externalData)
if err != nil {
return err
}
for _, wfs := range wfOb.Items {
uid := string(wfs.UID)
err = events.StopChaosEngineState(agentData["AGENT_NAMESPACE"], &uid)
err = utils.subscriberEventOperations.StopChaosEngineState(agentData["AGENT_NAMESPACE"], &uid)
if err != nil {
logrus.Info("failed to stop chaosEngine for : ", wfs.Name, " namespace: ", wfs.Namespace)
}
err = DeleteWorkflow(wfs.Name, agentData)
err = utils.DeleteWorkflow(wfs.Name, agentData)
if err != nil {
logrus.Info("failed to delete workflow: ", wfs.Name, " namespace: ", wfs.Namespace)
}
logrus.Info("events delete name: ", wfs.Name, " namespace: ", wfs.Namespace)
}
} else if requestType == "workflow_run_delete" {
wfOb, err := events.GetWorkflowObj(externalData)
wfOb, err := utils.subscriberEventOperations.GetWorkflowObj(externalData)
if err != nil {
return err
}

err = DeleteWorkflow(wfOb.Name, agentData)
err = utils.DeleteWorkflow(wfOb.Name, agentData)
if err != nil {
return err
}

logrus.Info("events delete name: ", wfOb.Name, "namespace: ", wfOb.Namespace)
} else if requestType == "workflow_run_stop" {
wfOb, err := events.GetWorkflowObj(externalData)
wfOb, err := utils.subscriberEventOperations.GetWorkflowObj(externalData)
if err != nil {
return err
}
err = events.StopChaosEngineState(agentData["INFRA_NAMESPACE"], &externalData)
err = utils.subscriberEventOperations.StopChaosEngineState(agentData["INFRA_NAMESPACE"], &externalData)
if err != nil {
logrus.Info("failed to stop chaosEngine for : ", wfOb.Name, " namespace: ", wfOb.Namespace, " : ", err)
}
err = events.StopWorkflow(wfOb.Name, wfOb.Namespace)
err = utils.subscriberEventOperations.StopWorkflow(wfOb.Name, wfOb.Namespace)
if err != nil {
logrus.Info("failed to stop experiment: ", wfOb.Name, " namespace: ", wfOb.Namespace, " : ", err)
}
@@ -60,9 +58,9 @@ func WorkflowRequest(agentData map[string]string, requestType string, externalDa
return nil
}

func DeleteWorkflow(wfname string, agentData map[string]string) error {
func (utils *subscriberUtils) DeleteWorkflow(wfname string, agentData map[string]string) error {
ctx := context.TODO()
conf, err := k8s.GetKubeConfig()
conf, err := utils.subscriberK8s.GetKubeConfig()
if err != nil {
return err
}
28 changes: 20 additions & 8 deletions chaoscenter/subscriber/subscriber.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,9 @@ import (
"github.com/gorilla/websocket"

"subscriber/pkg/events"
"subscriber/pkg/graphql"
"subscriber/pkg/requests"
"subscriber/pkg/utils"

"github.com/kelseyhightower/envconfig"

@@ -62,6 +64,9 @@ func init() {

var c Config

subscriberGraphql := graphql.NewSubscriberGql()
subscriberK8s := k8s.NewK8sSubscriber(subscriberGraphql)

err := envconfig.Process("", &c)
if err != nil {
logrus.Fatal(err)
@@ -86,22 +91,22 @@ func init() {
flag.Parse()

// check agent component status
err = k8s.CheckComponentStatus(infraData["COMPONENTS"])
err = subscriberK8s.CheckComponentStatus(infraData["COMPONENTS"])
if err != nil {
logrus.Fatal(err)
}

logrus.Info("Starting the subscriber")

isConfirmed, newKey, err := k8s.IsAgentConfirmed()
isConfirmed, newKey, err := subscriberK8s.IsAgentConfirmed()
if err != nil {
logrus.WithError(err).Fatal("Failed to check agent confirmed status")
}

if isConfirmed {
infraData["ACCESS_KEY"] = newKey
} else if !isConfirmed {
infraConfirmByte, err := k8s.AgentConfirm(infraData)
infraConfirmByte, err := subscriberK8s.AgentConfirm(infraData)
if err != nil {
logrus.WithError(err).WithField("data", string(infraConfirmByte)).Fatal("Failed to confirm agent")
}
@@ -116,7 +121,7 @@ func init() {
infraData["ACCESS_KEY"] = infraConfirmInterface.Data.InfraConfirm.NewAccessKey
infraData["IS_INFRA_CONFIRMED"] = "true"

_, err = k8s.AgentRegister(infraData)
_, err = subscriberK8s.AgentRegister(infraData)
if err != nil {
logrus.Fatal(err)
}
@@ -132,16 +137,23 @@ func main() {
sigCh := make(chan os.Signal)
stream := make(chan types.WorkflowEvent, 10)

subscriberGraphql := graphql.NewSubscriberGql()
subscriberK8s := k8s.NewK8sSubscriber(subscriberGraphql)
subscriberEvents := events.NewSubscriberEventsOperator(subscriberGraphql, subscriberK8s)
subscriberUtils := utils.NewSubscriberUtils(subscriberEvents, subscriberK8s)
subscriberEventOperations := events.NewSubscriberEventsOperator(subscriberGraphql, subscriberK8s)
subscriberRequests := requests.NewSubscriberRequests(subscriberK8s, subscriberUtils)
//start events event watcher
events.WorkflowEventWatcher(stopCh, stream, infraData)

subscriberEventOperations.WorkflowEventWatcher(stopCh, stream, infraData)

//start events event watcher
events.ChaosEventWatcher(stopCh, stream, infraData)
subscriberEventOperations.ChaosEventWatcher(stopCh, stream, infraData)
//streams the event data to graphql server
go events.WorkflowUpdates(infraData, stream)
go subscriberEventOperations.WorkflowUpdates(infraData, stream)

// listen for agent actions
go requests.AgentConnect(infraData)
go subscriberRequests.AgentConnect(infraData)

signal.Notify(sigCh, os.Kill, os.Interrupt)
<-sigCh