Skip to content

Commit

Permalink
Merge pull request #1 from avtarOPS/ingestion
Browse files Browse the repository at this point in the history
Ingestion Controller Updates
  • Loading branch information
AdheipSingh authored Dec 26, 2023
2 parents 7488f46 + 4822945 commit 2d1b7ff
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 59 deletions.
19 changes: 19 additions & 0 deletions controllers/druid/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package druid

import (
"encoding/json"
"fmt"
"os"
"reflect"
"strconv"
Expand Down Expand Up @@ -77,3 +79,20 @@ func Str2Int(s string) int {
}
return i
}

func IsEqualJson(s1, s2 string) (bool, error) {
var o1 interface{}
var o2 interface{}

var err error
err = json.Unmarshal([]byte(s1), &o1)
if err != nil {
return false, fmt.Errorf("error mashalling string 1 :: %s", err.Error())
}
err = json.Unmarshal([]byte(s2), &o2)
if err != nil {
return false, fmt.Errorf("error mashalling string 2 :: %s", err.Error())
}

return reflect.DeepEqual(o1, o2), nil
}
22 changes: 8 additions & 14 deletions controllers/ingestion/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,18 @@ import (

// DruidHTTP interface
type DruidHTTP interface {
Do() (*Response, error)
Do(method, url string, body []byte) (*Response, error)
}

// HTTP client
type Client struct {
Method string
URL string
HTTPClient http.Client
Body []byte
Auth Auth
type DruidClient struct {
HTTPClient *http.Client
Auth *Auth
}

func NewHTTPClient(method, url string, client http.Client, body []byte, auth Auth) DruidHTTP {
newClient := &Client{
Method: method,
URL: url,
func NewHTTPClient(client *http.Client, auth *Auth) DruidHTTP {
newClient := &DruidClient{
HTTPClient: client,
Body: body,
Auth: auth,
}

Expand All @@ -51,9 +45,9 @@ type Response struct {
}

// Do method to be used schema and tenant controller.
func (c *Client) Do() (*Response, error) {
func (c *DruidClient) Do(Method, url string, body []byte) (*Response, error) {

req, err := http.NewRequest(c.Method, c.URL, bytes.NewBuffer(c.Body))
req, err := http.NewRequest(Method, url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/ingestion/ingestion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (r *DruidIngestionReconciler) Reconcile(ctx context.Context, req ctrl.Reque
logr := log.FromContext(ctx)

druidIngestionCR := &v1alpha1.DruidIngestion{}
err := r.Get(context.TODO(), req.NamespacedName, druidIngestionCR)
err := r.Get(ctx, req.NamespacedName, druidIngestionCR)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
Expand Down
62 changes: 18 additions & 44 deletions controllers/ingestion/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"errors"
"fmt"
"net/http"
"reflect"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/datainfrahq/druid-operator/apis/druid/v1alpha1"
"github.com/datainfrahq/druid-operator/controllers/druid"
internalhttp "github.com/datainfrahq/druid-operator/controllers/ingestion/http"
"github.com/datainfrahq/operator-runtime/builder"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -43,7 +42,6 @@ const (
)

func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIngestion) error {

basicAuth, err := r.getAuthCreds(ctx, di)
if err != nil {
return err
Expand Down Expand Up @@ -81,14 +79,12 @@ func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIng
return err
}

http := internalhttp.NewHTTPClient(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, di.Status.TaskId, true),
http.Client{},
[]byte{},
internalhttp.Auth{BasicAuth: basicAuth},
posthttp := internalhttp.NewHTTPClient(
&http.Client{},
&internalhttp.Auth{BasicAuth: basicAuth},
)
respShutDownTask, err := http.Do()

respShutDownTask, err := posthttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, di.Status.TaskId, true), []byte{})
if err != nil {
return err
}
Expand Down Expand Up @@ -131,16 +127,14 @@ func (r *DruidIngestionReconciler) CreateOrUpdate(
// if does not exist create task

postHttp := internalhttp.NewHTTPClient(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false),
http.Client{},
[]byte(di.Spec.Ingestion.Spec),
auth,
&http.Client{},
&auth,
)

respCreateTask, err := postHttp.Do()
respCreateTask, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec))

if err != nil {
return controllerutil.OperationResultNone, nil
return controllerutil.OperationResultNone, err
}

// if success patch status
Expand Down Expand Up @@ -198,21 +192,18 @@ func (r *DruidIngestionReconciler) CreateOrUpdate(
}
} else {
// compare the state
ok, err := isEqualJson(di.Status.CurrentIngestionSpec, di.Spec.Ingestion.Spec)
ok, err := druid.IsEqualJson(di.Status.CurrentIngestionSpec, di.Spec.Ingestion.Spec)
if err != nil {
return controllerutil.OperationResultNone, err
}

if !ok {
postHttp := internalhttp.NewHTTPClient(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false),
http.Client{},
[]byte(di.Spec.Ingestion.Spec),
auth,
&http.Client{},
&auth,
)

respUpdateSpec, err := postHttp.Do()
respUpdateSpec, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec))
if err != nil {
return controllerutil.OperationResultNone, err
}
Expand Down Expand Up @@ -361,6 +352,7 @@ func (r *DruidIngestionReconciler) getRouterSvcUrl(namespace, druidClusterName s

func (r *DruidIngestionReconciler) getAuthCreds(ctx context.Context, di *v1alpha1.DruidIngestion) (internalhttp.BasicAuth, error) {
druid := v1alpha1.Druid{}
// check if the mentioned druid cluster exists
if err := r.Client.Get(ctx, types.NamespacedName{
Namespace: di.Namespace,
Name: di.Spec.DruidClusterName,
Expand All @@ -369,7 +361,7 @@ func (r *DruidIngestionReconciler) getAuthCreds(ctx context.Context, di *v1alpha
); err != nil {
return internalhttp.BasicAuth{}, err
}

// check if the mentioned secret exists
if di.Spec.Auth != (v1alpha1.Auth{}) {
secret := v1.Secret{}
if err := r.Client.Get(ctx, types.NamespacedName{
Expand All @@ -380,7 +372,6 @@ func (r *DruidIngestionReconciler) getAuthCreds(ctx context.Context, di *v1alpha
); err != nil {
return internalhttp.BasicAuth{}, err
}

creds := internalhttp.BasicAuth{
UserName: string(secret.Data[OperatorUserName]),
Password: string(secret.Data[OperatorPassword]),
Expand Down Expand Up @@ -427,20 +418,3 @@ func patchStatus(ctx context.Context, c client.Client, obj client.Object, transf
}
return obj, VerbPatched, nil
}

func isEqualJson(s1, s2 string) (bool, error) {
var o1 interface{}
var o2 interface{}

var err error
err = json.Unmarshal([]byte(s1), &o1)
if err != nil {
return false, fmt.Errorf("Error mashalling string 1 :: %s", err.Error())
}
err = json.Unmarshal([]byte(s2), &o2)
if err != nil {
return false, fmt.Errorf("Error mashalling string 2 :: %s", err.Error())
}

return reflect.DeepEqual(o1, o2), nil
}

0 comments on commit 2d1b7ff

Please sign in to comment.