Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka resource type #109

Merged
merged 7 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/goto/entropy/modules"
"github.com/goto/entropy/modules/firehose"
"github.com/goto/entropy/modules/job"
"github.com/goto/entropy/modules/kafka"
"github.com/goto/entropy/modules/kubernetes"
"github.com/goto/entropy/pkg/logger"
"github.com/goto/entropy/pkg/telemetry"
Expand Down Expand Up @@ -90,6 +91,7 @@ func setupRegistry() module.Registry {
kubernetes.Module,
firehose.Module,
job.Module,
kafka.Module,
}

registry := &modules.Registry{}
Expand Down
8 changes: 4 additions & 4 deletions internal/store/postgres/resource_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

const listResourceByFilterQuery = `SELECT r.id, r.urn, r.kind, r.name, r.project, r.created_at, r.updated_at, r.state_status, r.state_output, r.state_module_data, r.state_next_sync, r.state_sync_result, r.created_by, r.updated_by,
array_agg(rt.tag)::text[] AS tags,
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
COALESCE(NULLIF(array_agg(rt.tag), '{NULL}'), '{}')::text[] AS tags,
mabdh marked this conversation as resolved.
Show resolved Hide resolved
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
FROM resources r
LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id
LEFT JOIN resources d ON rd.depends_on = d.id
Expand All @@ -28,8 +28,8 @@ OFFSET $4
`

const listResourceWithSpecConfigsByFilterQuery = `SELECT r.id, r.urn, r.kind, r.name, r.project, r.created_at, r.updated_at, r.spec_configs, r.state_status, r.state_output, r.state_module_data, r.state_next_sync, r.state_sync_result, r.created_by, r.updated_by,
array_agg(rt.tag)::text[] AS tags,
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
COALESCE(NULLIF(array_agg(rt.tag), '{NULL}'), '{}')::text[] AS tags,
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
FROM resources r
LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id
LEFT JOIN resources d ON rd.depends_on = d.id
Expand Down
74 changes: 74 additions & 0 deletions modules/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package kafka

import (
_ "embed"
"encoding/json"

"github.com/goto/entropy/core/resource"
"github.com/goto/entropy/pkg/errors"
"github.com/goto/entropy/pkg/validator"
)

var (
//go:embed schema/config.json
configSchemaRaw []byte
validateConfig = validator.FromJSONSchema(configSchemaRaw)
)

type Config struct {
Entity string `json:"entity,omitempty"`
Environment string `json:"environment,omitempty"`
Landscape string `json:"landscape,omitempty"`
Organization string `json:"organization,omitempty"`
AdvertiseMode AdvertiseMode `json:"advertise_mode"`
Brokers []Broker `json:"brokers,omitempty"`
Type string `json:"type"`
}

type AdvertiseMode struct {
Host string `json:"host"`
Address string `json:"address"`
}

type Broker struct {
Name string `json:"name"`
Host string `json:"host"`
Address string `json:"address"`
}

func readConfig(res resource.Resource, confJSON json.RawMessage, dc driverConf) (*Config, error) {
var resCfg, cfg Config

if err := json.Unmarshal(confJSON, &cfg); err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error())
mabdh marked this conversation as resolved.
Show resolved Hide resolved
}

if res.Spec.Configs != nil {
if err := json.Unmarshal(res.Spec.Configs, &resCfg); err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error())
mabdh marked this conversation as resolved.
Show resolved Hide resolved
}
}

if cfg.Type == "" {
if resCfg.Type != "" {
cfg.Type = resCfg.Type
} else {
cfg.Type = dc.Type
}
}

if cfg.Brokers == nil {
cfg.Brokers = resCfg.Brokers
}

newConfJSON, err := json.Marshal(cfg)
if err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid config json").WithCausef(err.Error())
mabdh marked this conversation as resolved.
Show resolved Hide resolved
}

if err := validateConfig(newConfJSON); err != nil {
return nil, err
}

return &cfg, nil
}
94 changes: 94 additions & 0 deletions modules/kafka/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package kafka

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/goto/entropy/core/module"
"github.com/goto/entropy/core/resource"
"github.com/goto/entropy/modules"
)

var defaultDriverConf = driverConf{
Type: "source",
}

type kafkaDriver struct {
conf driverConf
}

type Output struct {
URL string `json:"url"`
}

type driverConf struct {
Type string `json:"type"`
}

func (m *kafkaDriver) Plan(ctx context.Context, res module.ExpandedResource,
act module.ActionRequest,
) (*resource.Resource, error) {
cfg, err := readConfig(res.Resource, act.Params, m.conf)
if err != nil {
return nil, err
}

res.Resource.Spec = resource.Spec{
Configs: modules.MustJSON(cfg),
Dependencies: nil,
}

res.Resource.State = resource.State{
Status: resource.StatusCompleted,
Output: modules.MustJSON(Output{
URL: mapUrl(cfg),
}),
}

return &res.Resource, nil
}

func (*kafkaDriver) Sync(_ context.Context, res module.ExpandedResource) (*resource.State, error) {
return &resource.State{
Status: resource.StatusCompleted,
Output: res.Resource.State.Output,
ModuleData: nil,
}, nil
}

func (m *kafkaDriver) Output(ctx context.Context, res module.ExpandedResource) (json.RawMessage, error) {
cfg, err := readConfig(res.Resource, res.Resource.Spec.Configs, m.conf)
if err != nil {
return nil, err
}

return modules.MustJSON(Output{
URL: mapUrl(cfg),
}), nil
}

func mapUrl(cfg *Config) string {
var mode, port string
if cfg.AdvertiseMode.Address != "" {
mode = "address"
port = cfg.AdvertiseMode.Address
} else {
mode = "host"
port = cfg.AdvertiseMode.Host
}

var urls []string
for _, broker := range cfg.Brokers {
var addr string
if mode == "address" {
addr = broker.Address
} else {
addr = broker.Host
}
urls = append(urls, fmt.Sprintf("%s:%s", addr, port))
}

return strings.Join(urls, ",")
}
24 changes: 24 additions & 0 deletions modules/kafka/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package kafka

import (
"encoding/json"

"github.com/goto/entropy/core/module"
)

var Module = module.Descriptor{
Kind: "kafka",
Actions: []module.ActionDesc{
{
Name: module.CreateAction,
},
{
Name: module.UpdateAction,
},
},
DriverFactory: func(_ json.RawMessage) (module.Driver, error) {
return &kafkaDriver{
conf: defaultDriverConf,
}, nil
},
}
53 changes: 53 additions & 0 deletions modules/kafka/schema/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["type", "brokers"],
"properties": {
"type": {
"type": "string"
},
"advertise_mode": {
"type": "object",
"additionalProperties": true,
"properties": {
"host": {
"type": "string"
},
"address": {
"type": "string"
}
}
},
"brokers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"host": {
"type": "string"
},
"address": {
"type": "string"
}
},
"required": ["name", "host", "address"]
}
},
"entity": {
"type": "string"
},
"environment": {
"type": "string"
},
"landscape": {
"type": "string"
},
"organization": {
"type": "string"
}
}
}
2 changes: 1 addition & 1 deletion test/e2e_test/firehose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type FirehoseTestSuite struct {
}

func (s *FirehoseTestSuite) SetupTest() {
s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), true)
s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), true, true)

modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{})
s.Require().NoError(err)
Expand Down
Loading
Loading