Skip to content

Commit

Permalink
MG-2142 - Consume Things connect/disconnect event in Bootstrap (absma…
Browse files Browse the repository at this point in the history
…ch#2192)

Signed-off-by: JeffMboya <[email protected]>
  • Loading branch information
JeffMboya authored Jun 24, 2024
1 parent 424aa7c commit 94053f3
Show file tree
Hide file tree
Showing 17 changed files with 590 additions and 226 deletions.
18 changes: 18 additions & 0 deletions bootstrap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,24 @@ func (lm *loggingMiddleware) RemoveChannelHandler(ctx context.Context, id string
return lm.svc.RemoveChannelHandler(ctx, id)
}

func (lm *loggingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", channelID),
slog.String("thing_id", thingID),
}
if err != nil {
args = append(args, slog.Any("error", err))
lm.logger.Warn("Connect thing handler failed to complete successfully", args...)
return
}
lm.logger.Info("Connect thing handler completed successfully", args...)
}(time.Now())

return lm.svc.ConnectThingHandler(ctx, channelID, thingID)
}

func (lm *loggingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
Expand Down
10 changes: 10 additions & 0 deletions bootstrap/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ func (mm *metricsMiddleware) RemoveChannelHandler(ctx context.Context, id string
return mm.svc.RemoveChannelHandler(ctx, id)
}

// ConnectThingHandler instruments ConnectThingHandler method with metrics.
func (mm *metricsMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
mm.counter.With("method", "connect_thing_handler").Add(1)
mm.latency.With("method", "connect_thing_handler").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.ConnectThingHandler(ctx, channelID, thingID)
}

// DisconnectThingHandler instruments DisconnectThingHandler method with metrics.
func (mm *metricsMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
Expand Down
6 changes: 4 additions & 2 deletions bootstrap/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ type ConfigRepository interface {
// RemoveChannel removes channel with the given ID.
RemoveChannel(ctx context.Context, id string) error

// DisconnectHandler changes state of the Config when the corresponding Thing is
// disconnected from the Channel.
// ConnectThing changes state of the Config when the corresponding Thing is connected to the Channel.
ConnectThing(ctx context.Context, channelID, thingID string) error

// DisconnectThing changes state of the Config when the corresponding Thing is disconnected from the Channel.
DisconnectThing(ctx context.Context, channelID, thingID string) error
}
4 changes: 2 additions & 2 deletions bootstrap/events/consumer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type updateChannelEvent struct {
}

// Connection event is either connect or disconnect event.
type disconnectEvent struct {
thingID string
type connectionEvent struct {
thingIDs []string
channelID string
}
73 changes: 67 additions & 6 deletions bootstrap/events/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@ import (
"time"

"github.com/absmach/magistrala/bootstrap"
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/absmach/magistrala/pkg/events"
)

const (
thingRemove = "thing.remove"
thingDisconnect = "policy.delete"
thingConnect = "group.assign"
thingDisconnect = "group.unassign"

channelPrefix = "group."
channelUpdate = channelPrefix + "update"
channelRemove = channelPrefix + "remove"

memberKind = "things"
relation = "group"
)

type eventHandler struct {
Expand All @@ -42,9 +47,32 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
case thingRemove:
rte := decodeRemoveThing(msg)
err = es.svc.RemoveConfigHandler(ctx, rte.id)
case thingConnect:
cte := decodeConnectThing(msg)
if cte.channelID == "" || len(cte.thingIDs) == 0 {
return svcerr.ErrMalformedEntity
}
for _, thingID := range cte.thingIDs {
if thingID == "" {
return svcerr.ErrMalformedEntity
}
if err := es.svc.ConnectThingHandler(ctx, cte.channelID, thingID); err != nil {
return err
}
}
case thingDisconnect:
dte := decodeDisconnectThing(msg)
err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID)
if dte.channelID == "" || len(dte.thingIDs) == 0 {
return svcerr.ErrMalformedEntity
}
for _, thingID := range dte.thingIDs {
if thingID == "" {
return svcerr.ErrMalformedEntity
}
if err = es.svc.DisconnectThingHandler(ctx, dte.channelID, thingID); err != nil {
return err
}
}
case channelUpdate:
uce := decodeUpdateChannel(msg)
err = es.handleUpdateChannel(ctx, uce)
Expand Down Expand Up @@ -87,10 +115,24 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent {
}
}

func decodeDisconnectThing(event map[string]interface{}) disconnectEvent {
return disconnectEvent{
channelID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
func decodeConnectThing(event map[string]interface{}) connectionEvent {
if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation {
return connectionEvent{}
}

return connectionEvent{
channelID: read(event, "group_id", ""),
thingIDs: ReadStringSlice(event, "member_ids"),
}
}

func decodeDisconnectThing(event map[string]interface{}) connectionEvent {
if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation {
return connectionEvent{}
}
return connectionEvent{
channelID: read(event, "group_id", ""),
thingIDs: ReadStringSlice(event, "member_ids"),
}
}

Expand All @@ -114,6 +156,25 @@ func read(event map[string]interface{}, key, def string) string {
return val
}

// ReadStringSlice reads string slice from event map.
// If value is not a string slice, returns empty slice.
func ReadStringSlice(event map[string]interface{}, key string) []string {
var res []string

vals, ok := event[key].([]interface{})
if !ok {
return res
}

for _, v := range vals {
if s, ok := v.(string); ok {
res = append(res, s)
}
}

return res
}

func readTime(event map[string]interface{}, key string, def time.Time) time.Time {
val, ok := event[key].(time.Time)
if !ok {
Expand Down
14 changes: 14 additions & 0 deletions bootstrap/events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"

channelPrefix = "group."
Expand Down Expand Up @@ -276,6 +277,19 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) {
return val, nil
}

type connectThingEvent struct {
thingID string
channelID string
}

func (cte connectThingEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"thing_id": cte.thingID,
"channel_id": cte.channelID,
"operation": thingConnect,
}, nil
}

type disconnectThingEvent struct {
thingID string
channelID string
Expand Down
17 changes: 15 additions & 2 deletions bootstrap/events/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,27 @@ func (es *eventStore) UpdateChannelHandler(ctx context.Context, channel bootstra
return es.Publish(ctx, ev)
}

func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.ConnectThingHandler(ctx, channelID, thingID); err != nil {
return err
}

ev := connectThingEvent{
thingID: thingID,
channelID: channelID,
}

return es.Publish(ctx, ev)
}

func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.DisconnectThingHandler(ctx, channelID, thingID); err != nil {
return err
}

ev := disconnectThingEvent{
thingID,
channelID,
thingID: thingID,
channelID: channelID,
}

return es.Publish(ctx, ev)
Expand Down
84 changes: 83 additions & 1 deletion bootstrap/events/producer/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"

channelPrefix = "group."
Expand Down Expand Up @@ -1039,6 +1040,87 @@ func TestRemoveConfigHandler(t *testing.T) {
}
}

func TestConnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

svc, boot, _, _ := newService(t, redisURL)

err = redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

cases := []struct {
desc string
channelID string
thingID string
err error
event map[string]interface{}
}{
{
desc: "connect thing handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
{
desc: "add non-existing channel handler",
channelID: "unknown",
err: nil,
event: nil,
},
{
desc: "add channel handler with empty ID",
channelID: "",
err: nil,
event: nil,
},
{
desc: "add channel handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
}

lastID := "0"
for _, tc := range cases {
repoCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))

streams := redisClient.XRead(context.Background(), &redis.XReadArgs{
Streams: []string{streamID, lastID},
Count: 1,
Block: time.Second,
}).Val()

var event map[string]interface{}
if len(streams) > 0 && len(streams[0].Messages) > 0 {
msg := streams[0].Messages[0]
event = msg.Values
event["timestamp"] = msg.ID
lastID = msg.ID
}

test(t, tc.event, event, tc.desc)
repoCall.Unset()
}
}

func TestDisconnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
Expand Down Expand Up @@ -1097,7 +1179,7 @@ func TestDisconnectThingHandler(t *testing.T) {

lastID := "0"
for _, tc := range cases {
repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err)
repoCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))

Expand Down
18 changes: 18 additions & 0 deletions bootstrap/mocks/configs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 94053f3

Please sign in to comment.