From 94053f37ee6939e4bed160dc630244fd63097396 Mon Sep 17 00:00:00 2001 From: JMboya <44696487+JeffMboya@users.noreply.github.com> Date: Mon, 24 Jun 2024 12:02:09 +0300 Subject: [PATCH] MG-2142 - Consume Things connect/disconnect event in Bootstrap (#2192) Signed-off-by: JeffMboya --- bootstrap/api/logging.go | 18 ++ bootstrap/api/metrics.go | 10 + bootstrap/configs.go | 6 +- bootstrap/events/consumer/events.go | 4 +- bootstrap/events/consumer/streams.go | 73 ++++++- bootstrap/events/producer/events.go | 14 ++ bootstrap/events/producer/streams.go | 17 +- bootstrap/events/producer/streams_test.go | 84 +++++++- bootstrap/mocks/configs.go | 18 ++ bootstrap/mocks/service.go | 26 ++- bootstrap/postgres/configs.go | 62 ++++-- bootstrap/postgres/configs_test.go | 178 ++++++++++++++- bootstrap/service.go | 15 +- bootstrap/service_test.go | 252 +++++++--------------- bootstrap/tracing/tracing.go | 11 + internal/groups/events/events.go | 16 +- internal/groups/events/streams.go | 12 +- 17 files changed, 590 insertions(+), 226 deletions(-) diff --git a/bootstrap/api/logging.go b/bootstrap/api/logging.go index eb92d7a45e..a9a5f25466 100644 --- a/bootstrap/api/logging.go +++ b/bootstrap/api/logging.go @@ -257,6 +257,24 @@ func (lm *loggingMiddleware) RemoveChannelHandler(ctx context.Context, id string return lm.svc.RemoveChannelHandler(ctx, id) } +func (lm *loggingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { + defer func(begin time.Time) { + args := []any{ + slog.String("duration", time.Since(begin).String()), + slog.String("channel_id", channelID), + slog.String("thing_id", thingID), + } + if err != nil { + args = append(args, slog.Any("error", err)) + lm.logger.Warn("Connect thing handler failed to complete successfully", args...) + return + } + lm.logger.Info("Connect thing handler completed successfully", args...) + }(time.Now()) + + return lm.svc.ConnectThingHandler(ctx, channelID, thingID) +} + func (lm *loggingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { defer func(begin time.Time) { args := []any{ diff --git a/bootstrap/api/metrics.go b/bootstrap/api/metrics.go index 8137635e3c..9e42a57c18 100644 --- a/bootstrap/api/metrics.go +++ b/bootstrap/api/metrics.go @@ -150,6 +150,16 @@ func (mm *metricsMiddleware) RemoveChannelHandler(ctx context.Context, id string return mm.svc.RemoveChannelHandler(ctx, id) } +// ConnectThingHandler instruments ConnectThingHandler method with metrics. +func (mm *metricsMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { + defer func(begin time.Time) { + mm.counter.With("method", "connect_thing_handler").Add(1) + mm.latency.With("method", "connect_thing_handler").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.ConnectThingHandler(ctx, channelID, thingID) +} + // DisconnectThingHandler instruments DisconnectThingHandler method with metrics. func (mm *metricsMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { defer func(begin time.Time) { diff --git a/bootstrap/configs.go b/bootstrap/configs.go index 6b7df9b42e..052526e913 100644 --- a/bootstrap/configs.go +++ b/bootstrap/configs.go @@ -112,7 +112,9 @@ type ConfigRepository interface { // RemoveChannel removes channel with the given ID. RemoveChannel(ctx context.Context, id string) error - // DisconnectHandler changes state of the Config when the corresponding Thing is - // disconnected from the Channel. + // ConnectThing changes state of the Config when the corresponding Thing is connected to the Channel. + ConnectThing(ctx context.Context, channelID, thingID string) error + + // DisconnectThing changes state of the Config when the corresponding Thing is disconnected from the Channel. DisconnectThing(ctx context.Context, channelID, thingID string) error } diff --git a/bootstrap/events/consumer/events.go b/bootstrap/events/consumer/events.go index 1f47340825..a3a0599650 100644 --- a/bootstrap/events/consumer/events.go +++ b/bootstrap/events/consumer/events.go @@ -18,7 +18,7 @@ type updateChannelEvent struct { } // Connection event is either connect or disconnect event. -type disconnectEvent struct { - thingID string +type connectionEvent struct { + thingIDs []string channelID string } diff --git a/bootstrap/events/consumer/streams.go b/bootstrap/events/consumer/streams.go index 10d8ff77eb..d3defda655 100644 --- a/bootstrap/events/consumer/streams.go +++ b/bootstrap/events/consumer/streams.go @@ -9,16 +9,21 @@ import ( "time" "github.com/absmach/magistrala/bootstrap" + svcerr "github.com/absmach/magistrala/pkg/errors/service" "github.com/absmach/magistrala/pkg/events" ) const ( thingRemove = "thing.remove" - thingDisconnect = "policy.delete" + thingConnect = "group.assign" + thingDisconnect = "group.unassign" channelPrefix = "group." channelUpdate = channelPrefix + "update" channelRemove = channelPrefix + "remove" + + memberKind = "things" + relation = "group" ) type eventHandler struct { @@ -42,9 +47,32 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { case thingRemove: rte := decodeRemoveThing(msg) err = es.svc.RemoveConfigHandler(ctx, rte.id) + case thingConnect: + cte := decodeConnectThing(msg) + if cte.channelID == "" || len(cte.thingIDs) == 0 { + return svcerr.ErrMalformedEntity + } + for _, thingID := range cte.thingIDs { + if thingID == "" { + return svcerr.ErrMalformedEntity + } + if err := es.svc.ConnectThingHandler(ctx, cte.channelID, thingID); err != nil { + return err + } + } case thingDisconnect: dte := decodeDisconnectThing(msg) - err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID) + if dte.channelID == "" || len(dte.thingIDs) == 0 { + return svcerr.ErrMalformedEntity + } + for _, thingID := range dte.thingIDs { + if thingID == "" { + return svcerr.ErrMalformedEntity + } + if err = es.svc.DisconnectThingHandler(ctx, dte.channelID, thingID); err != nil { + return err + } + } case channelUpdate: uce := decodeUpdateChannel(msg) err = es.handleUpdateChannel(ctx, uce) @@ -87,10 +115,24 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent { } } -func decodeDisconnectThing(event map[string]interface{}) disconnectEvent { - return disconnectEvent{ - channelID: read(event, "chan_id", ""), - thingID: read(event, "thing_id", ""), +func decodeConnectThing(event map[string]interface{}) connectionEvent { + if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation { + return connectionEvent{} + } + + return connectionEvent{ + channelID: read(event, "group_id", ""), + thingIDs: ReadStringSlice(event, "member_ids"), + } +} + +func decodeDisconnectThing(event map[string]interface{}) connectionEvent { + if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation { + return connectionEvent{} + } + return connectionEvent{ + channelID: read(event, "group_id", ""), + thingIDs: ReadStringSlice(event, "member_ids"), } } @@ -114,6 +156,25 @@ func read(event map[string]interface{}, key, def string) string { return val } +// ReadStringSlice reads string slice from event map. +// If value is not a string slice, returns empty slice. +func ReadStringSlice(event map[string]interface{}, key string) []string { + var res []string + + vals, ok := event[key].([]interface{}) + if !ok { + return res + } + + for _, v := range vals { + if s, ok := v.(string); ok { + res = append(res, s) + } + } + + return res +} + func readTime(event map[string]interface{}, key string, def time.Time) time.Time { val, ok := event[key].(time.Time) if !ok { diff --git a/bootstrap/events/producer/events.go b/bootstrap/events/producer/events.go index ed15704a4d..beebb7976a 100644 --- a/bootstrap/events/producer/events.go +++ b/bootstrap/events/producer/events.go @@ -23,6 +23,7 @@ const ( thingBootstrap = thingPrefix + "bootstrap" thingStateChange = thingPrefix + "change_state" thingUpdateConnections = thingPrefix + "update_connections" + thingConnect = thingPrefix + "connect" thingDisconnect = thingPrefix + "disconnect" channelPrefix = "group." @@ -276,6 +277,19 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) { return val, nil } +type connectThingEvent struct { + thingID string + channelID string +} + +func (cte connectThingEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "thing_id": cte.thingID, + "channel_id": cte.channelID, + "operation": thingConnect, + }, nil +} + type disconnectThingEvent struct { thingID string channelID string diff --git a/bootstrap/events/producer/streams.go b/bootstrap/events/producer/streams.go index 5765137758..d3f93af9a6 100644 --- a/bootstrap/events/producer/streams.go +++ b/bootstrap/events/producer/streams.go @@ -207,14 +207,27 @@ func (es *eventStore) UpdateChannelHandler(ctx context.Context, channel bootstra return es.Publish(ctx, ev) } +func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingID string) error { + if err := es.svc.ConnectThingHandler(ctx, channelID, thingID); err != nil { + return err + } + + ev := connectThingEvent{ + thingID: thingID, + channelID: channelID, + } + + return es.Publish(ctx, ev) +} + func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { if err := es.svc.DisconnectThingHandler(ctx, channelID, thingID); err != nil { return err } ev := disconnectThingEvent{ - thingID, - channelID, + thingID: thingID, + channelID: channelID, } return es.Publish(ctx, ev) diff --git a/bootstrap/events/producer/streams_test.go b/bootstrap/events/producer/streams_test.go index 175c79d365..2bcddd0e5c 100644 --- a/bootstrap/events/producer/streams_test.go +++ b/bootstrap/events/producer/streams_test.go @@ -48,6 +48,7 @@ const ( thingBootstrap = thingPrefix + "bootstrap" thingStateChange = thingPrefix + "change_state" thingUpdateConnections = thingPrefix + "update_connections" + thingConnect = thingPrefix + "connect" thingDisconnect = thingPrefix + "disconnect" channelPrefix = "group." @@ -1039,6 +1040,87 @@ func TestRemoveConfigHandler(t *testing.T) { } } +func TestConnectThingHandler(t *testing.T) { + err := redisClient.FlushAll(context.Background()).Err() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + svc, boot, _, _ := newService(t, redisURL) + + err = redisClient.FlushAll(context.Background()).Err() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + cases := []struct { + desc string + channelID string + thingID string + err error + event map[string]interface{} + }{ + { + desc: "connect thing handler successfully", + channelID: channel.ID, + thingID: "1", + err: nil, + event: map[string]interface{}{ + "channel_id": channel.ID, + "thing_id": "1", + "operation": thingConnect, + "timestamp": time.Now().UnixNano(), + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "add non-existing channel handler", + channelID: "unknown", + err: nil, + event: nil, + }, + { + desc: "add channel handler with empty ID", + channelID: "", + err: nil, + event: nil, + }, + { + desc: "add channel handler successfully", + channelID: channel.ID, + thingID: "1", + err: nil, + event: map[string]interface{}{ + "channel_id": channel.ID, + "thing_id": "1", + "operation": thingConnect, + "timestamp": time.Now().UnixNano(), + "occurred_at": time.Now().UnixNano(), + }, + }, + } + + lastID := "0" + for _, tc := range cases { + repoCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) + err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(context.Background(), &redis.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + Block: time.Second, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + event["timestamp"] = msg.ID + lastID = msg.ID + } + + test(t, tc.event, event, tc.desc) + repoCall.Unset() + } +} + func TestDisconnectThingHandler(t *testing.T) { err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -1097,7 +1179,7 @@ func TestDisconnectThingHandler(t *testing.T) { lastID := "0" for _, tc := range cases { - repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) + repoCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) diff --git a/bootstrap/mocks/configs.go b/bootstrap/mocks/configs.go index 6345bff34e..6f008be7ab 100644 --- a/bootstrap/mocks/configs.go +++ b/bootstrap/mocks/configs.go @@ -35,6 +35,24 @@ func (_m *ConfigRepository) ChangeState(ctx context.Context, owner string, id st return r0 } +// ConnectThing provides a mock function with given fields: ctx, channelID, thingID +func (_m *ConfigRepository) ConnectThing(ctx context.Context, channelID string, thingID string) error { + ret := _m.Called(ctx, channelID, thingID) + + if len(ret) == 0 { + panic("no return value specified for ConnectThing") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, channelID, thingID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // DisconnectThing provides a mock function with given fields: ctx, channelID, thingID func (_m *ConfigRepository) DisconnectThing(ctx context.Context, channelID string, thingID string) error { ret := _m.Called(ctx, channelID, thingID) diff --git a/bootstrap/mocks/service.go b/bootstrap/mocks/service.go index d312b15f1e..77e110c0ed 100644 --- a/bootstrap/mocks/service.go +++ b/bootstrap/mocks/service.go @@ -91,9 +91,27 @@ func (_m *Service) ChangeState(ctx context.Context, token string, id string, sta return r0 } -// DisconnectThingHandler provides a mock function with given fields: ctx, channelID, thingID -func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, thingID string) error { - ret := _m.Called(ctx, channelID, thingID) +// ConnectThingHandler provides a mock function with given fields: ctx, channelID, ThingID +func (_m *Service) ConnectThingHandler(ctx context.Context, channelID string, ThingID string) error { + ret := _m.Called(ctx, channelID, ThingID) + + if len(ret) == 0 { + panic("no return value specified for ConnectThingHandler") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, channelID, ThingID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DisconnectThingHandler provides a mock function with given fields: ctx, channelID, ThingID +func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, ThingID string) error { + ret := _m.Called(ctx, channelID, ThingID) if len(ret) == 0 { panic("no return value specified for DisconnectThingHandler") @@ -101,7 +119,7 @@ func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, channelID, thingID) + r0 = rf(ctx, channelID, ThingID) } else { r0 = ret.Error(0) } diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index 998f2a28aa..1221ff54ed 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -28,6 +28,7 @@ var ( errSaveConnections = errors.New("failed to insert connections to database") errUpdateChannels = errors.New("failed to update channels in bootstrap configuration database") errRemoveChannels = errors.New("failed to remove channels from bootstrap configuration in database") + errConnectThing = errors.New("failed to connect thing in bootstrap configuration in database") errDisconnectThing = errors.New("failed to disconnect thing in bootstrap configuration in database") ) @@ -47,42 +48,43 @@ func NewConfigRepository(db postgres.Database, log *slog.Logger) bootstrap.Confi return &configRepository{db: db, log: log} } -func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsConnIDs []string) (string, error) { +func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsConnIDs []string) (thingID string, err error) { q := `INSERT INTO configs (magistrala_thing, owner, name, client_cert, client_key, ca_cert, magistrala_key, external_id, external_key, content, state) - VALUES (:magistrala_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :magistrala_key, :external_id, :external_key, :content, :state)` + VALUES (:magistrala_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :magistrala_key, :external_id, :external_key, :content, :state)` tx, err := cr.db.BeginTxx(ctx, nil) if err != nil { return "", errors.Wrap(repoerr.ErrCreateEntity, err) } - dbcfg := toDBConfig(cfg) - if _, err := tx.NamedExec(q, dbcfg); err != nil { - e := err - if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation { - e = repoerr.ErrConflict + defer func() { + if err != nil { + err = cr.rollback(err, tx) } + }() - cr.rollback("Failed to insert a Config", tx) - return "", errors.Wrap(repoerr.ErrCreateEntity, e) + if _, err := tx.NamedExec(q, dbcfg); err != nil { + switch pgErr := err.(type) { + case *pgconn.PgError: + if pgErr.Code == pgerrcode.UniqueViolation { + return "", repoerr.ErrConflict + } + } + return "", err } if err := insertChannels(ctx, cfg.Owner, cfg.Channels, tx); err != nil { - cr.rollback("Failed to insert Channels", tx) return "", errors.Wrap(errSaveChannels, err) } if err := insertConnections(ctx, cfg, chsConnIDs, tx); err != nil { - cr.rollback("Failed to insert connections", tx) return "", errors.Wrap(errSaveConnections, err) } if err := tx.Commit(); err != nil { - cr.rollback("Failed to commit Config save", tx) return "", err } - return cfg.ThingID, nil } @@ -313,8 +315,13 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri return errors.Wrap(repoerr.ErrUpdateEntity, err) } + defer func() { + if err != nil { + err = cr.rollback(err, tx) + } + }() + if err := insertChannels(ctx, owner, channels, tx); err != nil { - cr.rollback("Failed to insert Channels during the update", tx) return errors.Wrap(repoerr.ErrUpdateEntity, err) } @@ -324,12 +331,10 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri return repoerr.ErrNotFound } } - cr.rollback("Failed to update connections during the update", tx) return errors.Wrap(repoerr.ErrUpdateEntity, err) } if err := tx.Commit(); err != nil { - cr.rollback("Failed to commit Config update", tx) return errors.Wrap(repoerr.ErrUpdateEntity, err) } @@ -451,12 +456,29 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { return nil } +func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error { + q := `UPDATE configs SET state = $1 WHERE EXISTS ( + SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` + result, err := cr.db.ExecContext(ctx, q, bootstrap.Active, thingID, channelID) + if err != nil { + return errors.Wrap(errConnectThing, err) + } + if rows, _ := result.RowsAffected(); rows == 0 { + return repoerr.ErrNotFound + } + return nil +} + func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error { q := `UPDATE configs SET state = $1 WHERE EXISTS ( SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` - if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil { + result, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID) + if err != nil { return errors.Wrap(errDisconnectThing, err) } + if rows, _ := result.RowsAffected(); rows == 0 { + return repoerr.ErrNotFound + } return nil } @@ -483,10 +505,12 @@ func (cr configRepository) retrieveAll(owner string, filter bootstrap.Filter) (s return fmt.Sprintf(template, f), params } -func (cr configRepository) rollback(content string, tx *sqlx.Tx) { +func (cr configRepository) rollback(defErr error, tx *sqlx.Tx) error { if err := tx.Rollback(); err != nil { - cr.log.Error(fmt.Sprintf("Failed to rollback due to %s", err)) + return errors.Wrap(defErr, errors.Wrap(errors.New("failed to rollback"), err)) } + + return defErr } func insertChannels(_ context.Context, owner string, channels []bootstrap.Channel, tx *sqlx.Tx) error { diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index 81e3d85256..34aa394508 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -11,6 +11,7 @@ import ( "github.com/absmach/magistrala/bootstrap" "github.com/absmach/magistrala/bootstrap/postgres" + "github.com/absmach/magistrala/internal/testsutil" "github.com/absmach/magistrala/pkg/errors" repoerr "github.com/absmach/magistrala/pkg/errors/repository" "github.com/gofrs/uuid" @@ -675,6 +676,107 @@ func TestRemoveChannel(t *testing.T) { assert.NotContains(t, cfg.Channels, c.Channels[0], fmt.Sprintf("expected to remove channel %s from %s", c.Channels[0], cfg.Channels)) } +func TestConnectThing(t *testing.T) { + repo := postgres.NewConfigRepository(db, testLog) + err := deleteChannels(context.Background(), repo) + require.Nil(t, err, "Channels cleanup expected to succeed.") + + c := config + // Use UUID to prevent conflicts. + uid, err := uuid.NewV4() + assert.Nil(t, err, fmt.Sprintf("Got unexpected error: %s.\n", err)) + c.ThingKey = uid.String() + c.ThingID = uid.String() + c.ExternalID = uid.String() + c.ExternalKey = uid.String() + c.State = bootstrap.Inactive + saved, err := repo.Save(context.Background(), c, channels) + assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) + + wrongID := testsutil.GenerateUUID(&testing.T{}) + + connectedThing := c + connectedThing.State = bootstrap.Active + + randomThing := c + randomThingID, err := uuid.NewV4() + randomThing.ThingID = randomThingID.String() + + emptyThing := c + emptyThing.ThingID = "" + emptyThing.ThingKey = "" + emptyThing.ExternalID = "" + emptyThing.ExternalKey = "" + emptyThing.Channels = []bootstrap.Channel{} + + cases := []struct { + desc string + owner string + id string + channels []bootstrap.Channel + connections []string + err error + }{ + { + desc: "connect disconnected thing", + owner: config.Owner, + id: saved, + channels: c.Channels, + connections: channels, + err: nil, + }, + { + desc: "connect already connected thing", + owner: config.Owner, + id: connectedThing.ThingID, + channels: c.Channels, + connections: channels, + err: nil, + }, + { + desc: "connect non-existent thing", + owner: config.Owner, + id: wrongID, + channels: c.Channels, + connections: channels, + err: repoerr.ErrNotFound, + }, + { + desc: "connect random thing", + owner: config.Owner, + id: randomThing.ThingID, + channels: c.Channels, + connections: channels, + err: repoerr.ErrNotFound, + }, + { + desc: "connect empty thing", + owner: config.Owner, + id: emptyThing.ThingID, + channels: c.Channels, + connections: channels, + err: repoerr.ErrNotFound, + }, + } + for _, tc := range cases { + for i, ch := range tc.channels { + if i == 0 { + err = repo.ConnectThing(context.Background(), ch.ID, tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: Expected error: %s, got: %s.\n", tc.desc, tc.err, err)) + cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) + assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + assert.Equal(t, cfg.State, bootstrap.Active, fmt.Sprintf("expected to be active when a connection is added from %s", cfg)) + } else { + _ = repo.ConnectThing(context.Background(), ch.ID, tc.id) + } + } + + cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) + assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + assert.Equal(t, cfg.State, bootstrap.Active, fmt.Sprintf("expected to be active when a connection is added from %s", cfg)) + } +} + func TestDisconnectThing(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) err := deleteChannels(context.Background(), repo) @@ -688,15 +790,85 @@ func TestDisconnectThing(t *testing.T) { c.ThingID = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() + c.State = bootstrap.Inactive saved, err := repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - err = repo.DisconnectThing(context.Background(), c.Channels[0].ID, saved) - assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + wrongID := testsutil.GenerateUUID(&testing.T{}) + + connectedThing := c + connectedThing.State = bootstrap.Active + + randomThing := c + randomThingID, err := uuid.NewV4() + randomThing.ThingID = randomThingID.String() + + emptyThing := c + emptyThing.ThingID = "" + emptyThing.ThingKey = "" + emptyThing.ExternalID = "" + emptyThing.ExternalKey = "" + + cases := []struct { + desc string + owner string + id string + channels []bootstrap.Channel + connections []string + err error + }{ + { + desc: "disconnect connected thing", + owner: config.Owner, + id: connectedThing.ThingID, + channels: c.Channels, + connections: channels, + err: nil, + }, + { + desc: "disconnect already disconnected thing", + owner: config.Owner, + id: saved, + channels: c.Channels, + connections: channels, + err: nil, + }, + { + desc: "disconnect invalid thing", + owner: config.Owner, + id: wrongID, + channels: c.Channels, + connections: channels, + err: repoerr.ErrNotFound, + }, + { + desc: "disconnect random thing", + owner: config.Owner, + id: randomThing.ThingID, + channels: c.Channels, + connections: channels, + err: repoerr.ErrNotFound, + }, + { + desc: "disconnect empty thing", + owner: config.Owner, + id: emptyThing.ThingID, + channels: c.Channels, + connections: channels, + err: repoerr.ErrNotFound, + }, + } + + for _, tc := range cases { + for _, ch := range tc.channels { + err = repo.DisconnectThing(context.Background(), ch.ID, tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: Expected error: %s, got: %s.\n", tc.desc, tc.err, err)) + } + } cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) - assert.Equal(t, cfg.State, bootstrap.Inactive, fmt.Sprintf("expected ti be inactive when a connection is removed from %s", cfg)) + assert.Equal(t, cfg.State, bootstrap.Inactive, fmt.Sprintf("expected to be inactive when a connection is removed from %s", cfg)) } func deleteChannels(ctx context.Context, repo bootstrap.ConfigRepository) error { diff --git a/bootstrap/service.go b/bootstrap/service.go index 1214e944c4..ac607762a4 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -41,6 +41,7 @@ var ( errRemoveConfig = errors.New("failed to remove bootstrap configuration") errRemoveChannel = errors.New("failed to remove channel") errCreateThing = errors.New("failed to create thing") + errConnectThing = errors.New("failed to connect thing") errDisconnectThing = errors.New("failed to disconnect thing") errCheckChannels = errors.New("failed to check if channels exists") errConnectionChannels = errors.New("failed to check channels connections") @@ -96,8 +97,11 @@ type Service interface { // RemoveChannelHandler removes Channel with id received from an event. RemoveChannelHandler(ctx context.Context, id string) error - // DisconnectHandler changes state of the Config when connect/disconnect event occurs. - DisconnectThingHandler(ctx context.Context, channelID, thingID string) error + // ConnectThingHandler changes state of the Config to active when connect event occurs. + ConnectThingHandler(ctx context.Context, channelID, ThingID string) error + + // DisconnectThingHandler changes state of the Config to inactive when disconnect event occurs. + DisconnectThingHandler(ctx context.Context, channelID, ThingID string) error } // ConfigReader is used to parse Config into format which will be encoded @@ -373,6 +377,13 @@ func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string) return nil } +func (bs bootstrapService) ConnectThingHandler(ctx context.Context, channelID, thingID string) error { + if err := bs.configs.ConnectThing(ctx, channelID, thingID); err != nil { + return errors.Wrap(errConnectThing, err) + } + return nil +} + func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil { return errors.Wrap(errDisconnectThing, err) diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index fe8aa290d1..5c73b59b1e 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -144,20 +144,7 @@ func TestAdd(t *testing.T) { } func TestView(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, auth, _ := newService() cases := []struct { desc string id string @@ -166,7 +153,7 @@ func TestView(t *testing.T) { }{ { desc: "view an existing config", - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, @@ -195,27 +182,14 @@ func TestView(t *testing.T) { } func TestUpdate(t *testing.T) { - svc, boot, auth, sdk := newService() + svc, boot, auth, _ := newService() c := config ch := channel ch.ID = "2" c.Channels = append(c.Channels, ch) - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, c) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - - modifiedCreated := saved + modifiedCreated := c modifiedCreated.Content = "new-config" modifiedCreated.Name = "new name" @@ -242,7 +216,7 @@ func TestUpdate(t *testing.T) { }, { desc: "update a config with wrong credentials", - config: saved, + config: c, token: invalidToken, err: svcerr.ErrAuthentication, }, @@ -259,25 +233,9 @@ func TestUpdate(t *testing.T) { } func TestUpdateCert(t *testing.T) { - svc, boot, auth, sdk := newService() + svc, boot, auth, _ := newService() c := config - ch := channel - ch.ID = "2" - c.Channels = append(c.Channels, ch) - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, c) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - cases := []struct { desc string token string @@ -290,21 +248,21 @@ func TestUpdateCert(t *testing.T) { }{ { desc: "update certs for the valid config", - thingID: saved.ThingID, + thingID: c.ThingID, clientCert: "newCert", clientKey: "newKey", caCert: "newCert", token: validToken, expectedConfig: bootstrap.Config{ - Name: saved.Name, - ThingKey: saved.ThingKey, - Channels: saved.Channels, - ExternalID: saved.ExternalID, - ExternalKey: saved.ExternalKey, - Content: saved.Content, - State: saved.State, - Owner: saved.Owner, - ThingID: saved.ThingID, + Name: c.Name, + ThingKey: c.ThingKey, + Channels: c.Channels, + ExternalID: c.ExternalID, + ExternalKey: c.ExternalKey, + Content: c.Content, + State: c.State, + Owner: c.Owner, + ThingID: c.ThingID, ClientCert: "newCert", CACert: "newCert", ClientKey: "newKey", @@ -323,7 +281,7 @@ func TestUpdateCert(t *testing.T) { }, { desc: "update config cert with wrong credentials", - thingID: saved.ThingID, + thingID: c.ThingID, clientCert: "newCert", clientKey: "newKey", caCert: "newCert", @@ -584,20 +542,7 @@ func TestList(t *testing.T) { } func TestRemove(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, auth, _ := newService() cases := []struct { desc string id string @@ -606,19 +551,19 @@ func TestRemove(t *testing.T) { }{ { desc: "view a config with wrong credentials", - id: saved.ThingID, + id: config.ThingID, token: invalidToken, err: svcerr.ErrAuthentication, }, { desc: "remove an existing config", - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, { desc: "remove removed config", - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, @@ -641,21 +586,8 @@ func TestRemove(t *testing.T) { } func TestBootstrap(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - - e, err := enc([]byte(saved.ExternalKey)) + svc, boot, _, _ := newService() + e, err := enc([]byte(config.ExternalKey)) assert.Nil(t, err, fmt.Sprintf("Encrypting external key expected to succeed: %s.\n", err)) cases := []struct { @@ -670,30 +602,30 @@ func TestBootstrap(t *testing.T) { desc: "bootstrap using invalid external id", config: bootstrap.Config{}, externalID: "invalid", - externalKey: saved.ExternalKey, + externalKey: config.ExternalKey, err: svcerr.ErrNotFound, encrypted: false, }, { desc: "bootstrap using invalid external key", config: bootstrap.Config{}, - externalID: saved.ExternalID, + externalID: config.ExternalID, externalKey: "invalid", err: bootstrap.ErrExternalKey, encrypted: false, }, { desc: "bootstrap an existing config", - config: saved, - externalID: saved.ExternalID, - externalKey: saved.ExternalKey, + config: config, + externalID: config.ExternalID, + externalKey: config.ExternalKey, err: nil, encrypted: false, }, { desc: "bootstrap encrypted", - config: saved, - externalID: saved.ExternalID, + config: config, + externalID: config.ExternalID, externalKey: hex.EncodeToString(e), err: nil, encrypted: true, @@ -711,19 +643,6 @@ func TestBootstrap(t *testing.T) { func TestChangeState(t *testing.T) { svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(toGroup(config.Channels[0]), nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - cases := []struct { desc string state bootstrap.State @@ -734,7 +653,7 @@ func TestChangeState(t *testing.T) { { desc: "change state with wrong credentials", state: bootstrap.Active, - id: saved.ThingID, + id: config.ThingID, token: invalidToken, err: svcerr.ErrAuthentication, }, @@ -748,21 +667,21 @@ func TestChangeState(t *testing.T) { { desc: "change state to Active", state: bootstrap.Active, - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, { desc: "change state to current state", state: bootstrap.Active, - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, { desc: "change state to Inactive", state: bootstrap.Inactive, - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, @@ -785,19 +704,7 @@ func TestChangeState(t *testing.T) { } func TestUpdateChannelHandler(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() + svc, boot, _, _ := newService() ch := bootstrap.Channel{ ID: channel.ID, Name: "new name", @@ -830,20 +737,7 @@ func TestUpdateChannelHandler(t *testing.T) { } func TestRemoveChannelHandler(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, _, _ := newService() cases := []struct { desc string id string @@ -851,7 +745,7 @@ func TestRemoveChannelHandler(t *testing.T) { }{ { desc: "remove an existing channel", - id: channel.ID, + id: config.Channels[0].ID, err: nil, }, { @@ -869,21 +763,8 @@ func TestRemoveChannelHandler(t *testing.T) { } } -func TestRemoveCoinfigHandler(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - +func TestRemoveConfigHandler(t *testing.T) { + svc, boot, _, _ := newService() cases := []struct { desc string id string @@ -891,7 +772,7 @@ func TestRemoveCoinfigHandler(t *testing.T) { }{ { desc: "remove an existing config", - id: saved.ThingID, + id: config.ThingID, err: nil, }, { @@ -909,21 +790,38 @@ func TestRemoveCoinfigHandler(t *testing.T) { } } -func TestDisconnectThingsHandler(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() +func TestConnectThingsHandler(t *testing.T) { + svc, boot, _, _ := newService() + cases := []struct { + desc string + thingID string + channelID string + err error + }{ + { + desc: "connect", + channelID: channel.ID, + thingID: config.ThingID, + err: nil, + }, + { + desc: "connect connected", + channelID: channel.ID, + thingID: config.ThingID, + err: svcerr.ErrAddPolicies, + }, + } + for _, tc := range cases { + repoCall := boot.On("ConnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) + err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID) + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + repoCall.Unset() + } +} + +func TestDisconnectThingsHandler(t *testing.T) { + svc, boot, _, _ := newService() cases := []struct { desc string thingID string @@ -933,22 +831,22 @@ func TestDisconnectThingsHandler(t *testing.T) { { desc: "disconnect", channelID: channel.ID, - thingID: saved.ThingID, + thingID: config.ThingID, err: nil, }, { desc: "disconnect disconnected", channelID: channel.ID, - thingID: saved.ThingID, + thingID: config.ThingID, err: nil, }, } for _, tc := range cases { - repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) + svcCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - repoCall.Unset() + svcCall.Unset() } } diff --git a/bootstrap/tracing/tracing.go b/bootstrap/tracing/tracing.go index 6d867aa39e..852a3f088b 100644 --- a/bootstrap/tracing/tracing.go +++ b/bootstrap/tracing/tracing.go @@ -158,6 +158,17 @@ func (tm *tracingMiddleware) RemoveChannelHandler(ctx context.Context, id string return tm.svc.RemoveChannelHandler(ctx, id) } +// ConnectThingHandler traces the "ConnectThingHandler" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) error { + ctx, span := tm.tracer.Start(ctx, "svc_connect_thing_handler", trace.WithAttributes( + attribute.String("channel_id", channelID), + attribute.String("thing_id", thingID), + )) + defer span.End() + + return tm.svc.ConnectThingHandler(ctx, channelID, thingID) +} + // DisconnectThingHandler traces the "DisconnectThingHandler" operation of the wrapped bootstrap.Service. func (tm *tracingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { ctx, span := tm.tracer.Start(ctx, "svc_disconnect_thing_handler", trace.WithAttributes( diff --git a/internal/groups/events/events.go b/internal/groups/events/events.go index d15a2a1caa..f35153c5c1 100644 --- a/internal/groups/events/events.go +++ b/internal/groups/events/events.go @@ -39,14 +39,18 @@ var ( ) type assignEvent struct { - memberIDs []string - groupID string + memberIDs []string + relation string + memberKind string + groupID string } func (cge assignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ "operation": groupAssign, "member_ids": cge.memberIDs, + "relation": cge.relation, + "memberKind": cge.memberKind, "group_id": cge.groupID, } @@ -54,14 +58,18 @@ func (cge assignEvent) Encode() (map[string]interface{}, error) { } type unassignEvent struct { - memberIDs []string - groupID string + memberIDs []string + relation string + memberKind string + groupID string } func (cge unassignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ "operation": groupUnassign, "member_ids": cge.memberIDs, + "relation": cge.relation, + "memberKind": cge.memberKind, "group_id": cge.groupID, } diff --git a/internal/groups/events/streams.go b/internal/groups/events/streams.go index 10610d6686..9fa5fede07 100644 --- a/internal/groups/events/streams.go +++ b/internal/groups/events/streams.go @@ -145,8 +145,10 @@ func (es eventStore) Assign(ctx context.Context, token, groupID, relation, membe } event := assignEvent{ - groupID: groupID, - memberIDs: memberIDs, + groupID: groupID, + relation: relation, + memberKind: memberKind, + memberIDs: memberIDs, } if err := es.Publish(ctx, event); err != nil { @@ -162,8 +164,10 @@ func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, mem } event := unassignEvent{ - groupID: groupID, - memberIDs: memberIDs, + groupID: groupID, + relation: relation, + memberKind: memberKind, + memberIDs: memberIDs, } if err := es.Publish(ctx, event); err != nil {