Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion Controller Updates #1

Merged
merged 2 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions controllers/druid/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package druid

import (
"encoding/json"
"fmt"
"os"
"reflect"
"strconv"
Expand Down Expand Up @@ -77,3 +79,20 @@ func Str2Int(s string) int {
}
return i
}

func IsEqualJson(s1, s2 string) (bool, error) {
var o1 interface{}
var o2 interface{}

var err error
err = json.Unmarshal([]byte(s1), &o1)
if err != nil {
return false, fmt.Errorf("error mashalling string 1 :: %s", err.Error())
}
err = json.Unmarshal([]byte(s2), &o2)
if err != nil {
return false, fmt.Errorf("error mashalling string 2 :: %s", err.Error())
}

return reflect.DeepEqual(o1, o2), nil
}
22 changes: 8 additions & 14 deletions controllers/ingestion/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,18 @@ import (

// DruidHTTP interface
type DruidHTTP interface {
Do() (*Response, error)
Do(method, url string, body []byte) (*Response, error)
}

// HTTP client
type Client struct {
Method string
URL string
HTTPClient http.Client
Body []byte
Auth Auth
type DruidClient struct {
HTTPClient *http.Client
Auth *Auth
}

func NewHTTPClient(method, url string, client http.Client, body []byte, auth Auth) DruidHTTP {
newClient := &Client{
Method: method,
URL: url,
func NewHTTPClient(client *http.Client, auth *Auth) DruidHTTP {
newClient := &DruidClient{
HTTPClient: client,
Body: body,
Auth: auth,
}

Expand All @@ -51,9 +45,9 @@ type Response struct {
}

// Do method to be used schema and tenant controller.
func (c *Client) Do() (*Response, error) {
func (c *DruidClient) Do(Method, url string, body []byte) (*Response, error) {

req, err := http.NewRequest(c.Method, c.URL, bytes.NewBuffer(c.Body))
req, err := http.NewRequest(Method, url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/ingestion/ingestion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (r *DruidIngestionReconciler) Reconcile(ctx context.Context, req ctrl.Reque
logr := log.FromContext(ctx)

druidIngestionCR := &v1alpha1.DruidIngestion{}
err := r.Get(context.TODO(), req.NamespacedName, druidIngestionCR)
err := r.Get(ctx, req.NamespacedName, druidIngestionCR)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
Expand Down
62 changes: 18 additions & 44 deletions controllers/ingestion/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"errors"
"fmt"
"net/http"
"reflect"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/datainfrahq/druid-operator/apis/druid/v1alpha1"
"github.com/datainfrahq/druid-operator/controllers/druid"
internalhttp "github.com/datainfrahq/druid-operator/controllers/ingestion/http"
"github.com/datainfrahq/operator-runtime/builder"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -43,7 +42,6 @@ const (
)

func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIngestion) error {

basicAuth, err := r.getAuthCreds(ctx, di)
if err != nil {
return err
Expand Down Expand Up @@ -81,14 +79,12 @@ func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIng
return err
}

http := internalhttp.NewHTTPClient(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, di.Status.TaskId, true),
http.Client{},
[]byte{},
internalhttp.Auth{BasicAuth: basicAuth},
posthttp := internalhttp.NewHTTPClient(
&http.Client{},
&internalhttp.Auth{BasicAuth: basicAuth},
)
respShutDownTask, err := http.Do()

respShutDownTask, err := posthttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, di.Status.TaskId, true), []byte{})
if err != nil {
return err
}
Expand Down Expand Up @@ -131,16 +127,14 @@ func (r *DruidIngestionReconciler) CreateOrUpdate(
// if does not exist create task

postHttp := internalhttp.NewHTTPClient(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false),
http.Client{},
[]byte(di.Spec.Ingestion.Spec),
auth,
&http.Client{},
&auth,
)

respCreateTask, err := postHttp.Do()
respCreateTask, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec))

if err != nil {
return controllerutil.OperationResultNone, nil
return controllerutil.OperationResultNone, err
}

// if success patch status
Expand Down Expand Up @@ -198,21 +192,18 @@ func (r *DruidIngestionReconciler) CreateOrUpdate(
}
} else {
// compare the state
ok, err := isEqualJson(di.Status.CurrentIngestionSpec, di.Spec.Ingestion.Spec)
ok, err := druid.IsEqualJson(di.Status.CurrentIngestionSpec, di.Spec.Ingestion.Spec)
if err != nil {
return controllerutil.OperationResultNone, err
}

if !ok {
postHttp := internalhttp.NewHTTPClient(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false),
http.Client{},
[]byte(di.Spec.Ingestion.Spec),
auth,
&http.Client{},
&auth,
)

respUpdateSpec, err := postHttp.Do()
respUpdateSpec, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec))
if err != nil {
return controllerutil.OperationResultNone, err
}
Expand Down Expand Up @@ -361,6 +352,7 @@ func (r *DruidIngestionReconciler) getRouterSvcUrl(namespace, druidClusterName s

func (r *DruidIngestionReconciler) getAuthCreds(ctx context.Context, di *v1alpha1.DruidIngestion) (internalhttp.BasicAuth, error) {
druid := v1alpha1.Druid{}
// check if the mentioned druid cluster exists
if err := r.Client.Get(ctx, types.NamespacedName{
Namespace: di.Namespace,
Name: di.Spec.DruidClusterName,
Expand All @@ -369,7 +361,7 @@ func (r *DruidIngestionReconciler) getAuthCreds(ctx context.Context, di *v1alpha
); err != nil {
return internalhttp.BasicAuth{}, err
}

// check if the mentioned secret exists
if di.Spec.Auth != (v1alpha1.Auth{}) {
secret := v1.Secret{}
if err := r.Client.Get(ctx, types.NamespacedName{
Expand All @@ -380,7 +372,6 @@ func (r *DruidIngestionReconciler) getAuthCreds(ctx context.Context, di *v1alpha
); err != nil {
return internalhttp.BasicAuth{}, err
}

creds := internalhttp.BasicAuth{
UserName: string(secret.Data[OperatorUserName]),
Password: string(secret.Data[OperatorPassword]),
Expand Down Expand Up @@ -427,20 +418,3 @@ func patchStatus(ctx context.Context, c client.Client, obj client.Object, transf
}
return obj, VerbPatched, nil
}

func isEqualJson(s1, s2 string) (bool, error) {
var o1 interface{}
var o2 interface{}

var err error
err = json.Unmarshal([]byte(s1), &o1)
if err != nil {
return false, fmt.Errorf("Error mashalling string 1 :: %s", err.Error())
}
err = json.Unmarshal([]byte(s2), &o2)
if err != nil {
return false, fmt.Errorf("Error mashalling string 2 :: %s", err.Error())
}

return reflect.DeepEqual(o1, o2), nil
}