From 55280fbb7557fd49db2f2d1f286b19989f62e9ed Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Tue, 23 Jul 2024 21:23:55 +0300 Subject: [PATCH 1/9] Fix ConnectThing in repo and service layer Signed-off-by: nyagamunene --- bootstrap/configs.go | 3 -- bootstrap/mocks/configs.go | 18 ---------- bootstrap/postgres/configs.go | 37 ++++---------------- bootstrap/postgres/configs_test.go | 56 ------------------------------ bootstrap/service.go | 23 ++++++------ 5 files changed, 20 insertions(+), 117 deletions(-) diff --git a/bootstrap/configs.go b/bootstrap/configs.go index 13333e94cd..da93210f02 100644 --- a/bootstrap/configs.go +++ b/bootstrap/configs.go @@ -94,9 +94,6 @@ type ConfigRepository interface { // by the specified user. Remove(ctx context.Context, domainID, id string) error - // ChangeState changes of the Config, that is owned by the specific user. - ChangeState(ctx context.Context, domainID, id string, state State) error - // ListExisting retrieves those channels from the given list that exist in DB. ListExisting(ctx context.Context, domainID string, ids []string) ([]Channel, error) diff --git a/bootstrap/mocks/configs.go b/bootstrap/mocks/configs.go index d088cb1356..875ec20410 100644 --- a/bootstrap/mocks/configs.go +++ b/bootstrap/mocks/configs.go @@ -17,24 +17,6 @@ type ConfigRepository struct { mock.Mock } -// ChangeState provides a mock function with given fields: ctx, domainID, id, state -func (_m *ConfigRepository) ChangeState(ctx context.Context, domainID string, id string, state bootstrap.State) error { - ret := _m.Called(ctx, domainID, id, state) - - if len(ret) == 0 { - panic("no return value specified for ChangeState") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, bootstrap.State) error); ok { - r0 = rf(ctx, domainID, id, state) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // ConnectThing provides a mock function with given fields: ctx, channelID, thingID func (_m *ConfigRepository) ConnectThing(ctx context.Context, channelID string, thingID string) error { ret := _m.Called(ctx, channelID, thingID) diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index c8b90c09a5..c15e959ca9 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -362,32 +362,6 @@ func (cr configRepository) Remove(ctx context.Context, domainID, id string) erro return nil } -func (cr configRepository) ChangeState(ctx context.Context, domainID, id string, state bootstrap.State) error { - q := `UPDATE configs SET state = :state WHERE magistrala_thing = :magistrala_thing AND domain_id = :domain_id;` - - dbcfg := dbConfig{ - ThingID: id, - State: state, - DomainID: domainID, - } - - res, err := cr.db.NamedExecContext(ctx, q, dbcfg) - if err != nil { - return errors.Wrap(repoerr.ErrUpdateEntity, err) - } - - cnt, err := res.RowsAffected() - if err != nil { - return errors.Wrap(repoerr.ErrUpdateEntity, err) - } - - if cnt == 0 { - return repoerr.ErrNotFound - } - - return nil -} - func (cr configRepository) ListExisting(ctx context.Context, domainID string, ids []string) ([]bootstrap.Channel, error) { var channels []bootstrap.Channel if len(ids) == 0 { @@ -460,8 +434,10 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { } func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error { - q := `UPDATE configs SET state = $1 WHERE EXISTS ( - SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` + q := `UPDATE configs SET state = $1 + WHERE magistrala_thing = $2 + AND EXISTS (SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` + result, err := cr.db.ExecContext(ctx, q, bootstrap.Active, thingID, channelID) if err != nil { return errors.Wrap(errConnectThing, err) @@ -473,8 +449,9 @@ func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID } func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error { - q := `UPDATE configs SET state = $1 WHERE EXISTS ( - SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` + q := `UPDATE configs SET state = $1 + WHERE magistrala_thing = $2 + AND EXISTS (SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID) if err != nil { return errors.Wrap(errDisconnectThing, err) diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index b66a52fcbd..d2b1e7131f 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -521,62 +521,6 @@ func TestRemove(t *testing.T) { } } -func TestChangeState(t *testing.T) { - repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(context.Background(), repo) - require.Nil(t, err, "Channels cleanup expected to succeed.") - - c := config - // Use UUID to prevent conflicts. - uid, err := uuid.NewV4() - assert.Nil(t, err, fmt.Sprintf("Got unexpected error: %s.\n", err)) - c.ThingKey = uid.String() - c.ThingID = uid.String() - c.ExternalID = uid.String() - c.ExternalKey = uid.String() - saved, err := repo.Save(context.Background(), c, channels) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - - cases := []struct { - desc string - domainID string - id string - state bootstrap.State - err error - }{ - { - desc: "change state with wrong domain ID ", - id: saved, - domainID: "2", - err: repoerr.ErrNotFound, - }, - { - desc: "change state with wrong id", - id: "wrong", - domainID: c.DomainID, - err: repoerr.ErrNotFound, - }, - { - desc: "change state to Active", - id: saved, - domainID: c.DomainID, - state: bootstrap.Active, - err: nil, - }, - { - desc: "change state to Inactive", - id: saved, - domainID: c.DomainID, - state: bootstrap.Inactive, - err: nil, - }, - } - for _, tc := range cases { - err := repo.ChangeState(context.Background(), tc.domainID, tc.id, tc.state) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - } -} - func TestListExisting(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) err := deleteChannels(context.Background(), repo) diff --git a/bootstrap/service.go b/bootstrap/service.go index e3a713625d..db07706027 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -170,10 +170,21 @@ func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (C return Config{}, errors.Wrap(svcerr.ErrMalformedEntity, errNotInSameDomain) } } + state := Inactive + pm := mgsdk.PageMetadata{} + tp := mgsdk.ThingsPage{} + for _, channel := range cfg.Channels { + if tp, err = bs.sdk.ThingsByChannel(channel.ID, pm, token); err != nil { + return Config{}, errors.Wrap(svcerr.ErrMalformedEntity, err) + } + if tp.Total > 0 { + state = Active + } + } cfg.ThingID = mgThing.ID cfg.DomainID = user.GetDomainId() - cfg.State = Inactive + cfg.State = state cfg.ThingKey = mgThing.Credentials.Secret saved, err := bs.configs.Save(ctx, cfg, toConnect) @@ -410,15 +421,10 @@ func (bs bootstrapService) ChangeState(ctx context.Context, token, id string, st if cfg.State == state { return nil } - switch state { case Active: for _, c := range cfg.Channels { - conIDs := mgsdk.Connection{ - ChannelID: c.ID, - ThingID: cfg.ThingID, - } - if err := bs.sdk.Connect(conIDs, token); err != nil { + if err := bs.sdk.ConnectThing(cfg.ThingID, c.ID, token); err != nil { // Ignore conflict errors as they indicate the connection already exists. if errors.Contains(err, svcerr.ErrConflict) { continue @@ -436,9 +442,6 @@ func (bs bootstrapService) ChangeState(ctx context.Context, token, id string, st } } } - if err := bs.configs.ChangeState(ctx, user.GetDomainId(), id, state); err != nil { - return errors.Wrap(errChangeState, err) - } return nil } From 197acc929b5b60a05bf313753700765598f2ef54 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Tue, 23 Jul 2024 21:46:28 +0300 Subject: [PATCH 2/9] Refactor code in Add Signed-off-by: nyagamunene --- bootstrap/service.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bootstrap/service.go b/bootstrap/service.go index db07706027..cf39fbfb0b 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -170,15 +170,17 @@ func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (C return Config{}, errors.Wrap(svcerr.ErrMalformedEntity, errNotInSameDomain) } } + state := Inactive pm := mgsdk.PageMetadata{} - tp := mgsdk.ThingsPage{} for _, channel := range cfg.Channels { - if tp, err = bs.sdk.ThingsByChannel(channel.ID, pm, token); err != nil { + tp, err := bs.sdk.ThingsByChannel(channel.ID, pm, token) + if err != nil { return Config{}, errors.Wrap(svcerr.ErrMalformedEntity, err) } if tp.Total > 0 { state = Active + break } } From 530c2d78b97dc1f37404585dbde4f05eccf1e693 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Wed, 24 Jul 2024 13:19:57 +0300 Subject: [PATCH 3/9] Refactor check for connection and add comments Signed-off-by: nyagamunene --- bootstrap/service.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/bootstrap/service.go b/bootstrap/service.go index cf39fbfb0b..875514df79 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -146,6 +146,23 @@ func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (C return Config{}, err } + // Check for existing connection between channel and thing + // If it exists set state to acive + state := Inactive + pm := mgsdk.PageMetadata{} + for _, channel := range cfg.Channels { + tp, err := bs.sdk.ThingsByChannel(channel.ID, pm, token) + if err != nil { + return Config{}, errors.Wrap(svcerr.ErrMalformedEntity, err) + } + for _, thing := range tp.Things { + if thing.ID == cfg.ThingID { + state = Active + break + } + } + } + toConnect := bs.toIDList(cfg.Channels) // Check if channels exist. This is the way to prevent fetching channels that already exist. @@ -171,19 +188,6 @@ func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (C } } - state := Inactive - pm := mgsdk.PageMetadata{} - for _, channel := range cfg.Channels { - tp, err := bs.sdk.ThingsByChannel(channel.ID, pm, token) - if err != nil { - return Config{}, errors.Wrap(svcerr.ErrMalformedEntity, err) - } - if tp.Total > 0 { - state = Active - break - } - } - cfg.ThingID = mgThing.ID cfg.DomainID = user.GetDomainId() cfg.State = state From 83851c83071510cbc5256bfd995410e25418784e Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Wed, 24 Jul 2024 13:48:04 +0300 Subject: [PATCH 4/9] Add tests cases for TestAdd Signed-off-by: nyagamunene --- bootstrap/service_test.go | 45 +++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index 63960a841f..2d99e055e1 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -96,6 +96,7 @@ func TestAdd(t *testing.T) { cases := []struct { desc string config bootstrap.Config + page mgsdk.ThingsPage token string userID string domainID string @@ -119,6 +120,22 @@ func TestAdd(t *testing.T) { authResponse: &magistrala.AuthorizeRes{Authorized: true}, err: nil, }, + { + desc: "add config with an existing connection", + config: config, + page: mgsdk.ThingsPage{ + Things: []mgsdk.Thing{ + { + ID: config.ThingID, + }, + }, + }, + token: validToken, + userID: validID, + domainID: domainID, + authResponse: &magistrala.AuthorizeRes{Authorized: true}, + err: nil, + }, { desc: "add a config with an invalid ID", config: neID, @@ -194,11 +211,12 @@ func TestAdd(t *testing.T) { for _, tc := range cases { authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID, DomainId: tc.domainID}, tc.identifyErr) authCall1 := auth.On("Authorize", context.Background(), mock.Anything).Return(tc.authResponse, tc.authorizeErr) - repoCall := sdk.On("Thing", tc.config.ThingID, tc.token).Return(mgsdk.Thing{ID: tc.config.ThingID, Credentials: mgsdk.Credentials{Secret: tc.config.ThingKey}}, tc.thingErr) - repoCall1 := sdk.On("CreateThing", mock.Anything, tc.token).Return(mgsdk.Thing{}, tc.createThingErr) - repoCall2 := sdk.On("DeleteThing", tc.config.ThingID, tc.token).Return(tc.deleteThingErr) - repoCall3 := boot.On("ListExisting", context.Background(), tc.domainID, mock.Anything).Return(tc.config.Channels, tc.listExistingErr) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, tc.saveErr) + repoCall := sdk.On("ThingsByChannel", mock.Anything, mock.Anything, mock.Anything).Return(tc.page, tc.thingErr) + repoCall1 := sdk.On("Thing", tc.config.ThingID, tc.token).Return(mgsdk.Thing{ID: tc.config.ThingID, Credentials: mgsdk.Credentials{Secret: tc.config.ThingKey}}, tc.thingErr) + repoCall2 := sdk.On("CreateThing", mock.Anything, tc.token).Return(mgsdk.Thing{}, tc.createThingErr) + repoCall3 := sdk.On("DeleteThing", tc.config.ThingID, tc.token).Return(tc.deleteThingErr) + repoCall4 := boot.On("ListExisting", context.Background(), tc.domainID, mock.Anything).Return(tc.config.Channels, tc.listExistingErr) + repoCall5 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, tc.saveErr) _, err := c.Add(context.Background(), tc.token, tc.config) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -210,6 +228,7 @@ func TestAdd(t *testing.T) { repoCall2.Unset() repoCall3.Unset() repoCall4.Unset() + repoCall5.Unset() } } @@ -1204,30 +1223,20 @@ func TestChangeState(t *testing.T) { connectErr: errors.NewSDKError(bootstrap.ErrThings), err: bootstrap.ErrThings, }, - { - desc: "change state with invalid state", - state: bootstrap.State(2), - id: c.ThingID, - token: validToken, - userID: validID, - domainID: domainID, - stateErr: svcerr.ErrMalformedEntity, - err: svcerr.ErrMalformedEntity, - }, } for _, tc := range cases { authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID, DomainId: tc.domainID}, tc.identifyErr) repoCall := boot.On("RetrieveByID", context.Background(), tc.domainID, tc.id).Return(c, tc.retrieveErr) - sdkCall := sdk.On("Connect", mock.Anything, mock.Anything).Return(tc.connectErr) - repoCall1 := boot.On("ChangeState", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(tc.stateErr) + sdkCall := sdk.On("ConnectThing", mock.Anything, mock.Anything, mock.Anything).Return(tc.connectErr) + // repoCall1 := boot.On("ChangeState", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(tc.stateErr) err := svc.ChangeState(context.Background(), tc.token, tc.id, tc.state) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) authCall.Unset() sdkCall.Unset() repoCall.Unset() - repoCall1.Unset() + // repoCall1.Unset() } } From f7a9fe29ce381379a16fb99a91299f1e24aff9f7 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Wed, 24 Jul 2024 13:54:57 +0300 Subject: [PATCH 5/9] Fix lint errors Signed-off-by: nyagamunene --- bootstrap/service_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index 2d99e055e1..69c11cee0a 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -1229,14 +1229,12 @@ func TestChangeState(t *testing.T) { authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID, DomainId: tc.domainID}, tc.identifyErr) repoCall := boot.On("RetrieveByID", context.Background(), tc.domainID, tc.id).Return(c, tc.retrieveErr) sdkCall := sdk.On("ConnectThing", mock.Anything, mock.Anything, mock.Anything).Return(tc.connectErr) - // repoCall1 := boot.On("ChangeState", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(tc.stateErr) err := svc.ChangeState(context.Background(), tc.token, tc.id, tc.state) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) authCall.Unset() sdkCall.Unset() repoCall.Unset() - // repoCall1.Unset() } } From 4e41ebca9618051a005cb036bd1422555e303a1f Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Wed, 24 Jul 2024 14:23:03 +0300 Subject: [PATCH 6/9] Fix TestAdd in stream tests Signed-off-by: nyagamunene --- bootstrap/events/producer/streams_test.go | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/bootstrap/events/producer/streams_test.go b/bootstrap/events/producer/streams_test.go index 759c689a2d..18e6fded9c 100644 --- a/bootstrap/events/producer/streams_test.go +++ b/bootstrap/events/producer/streams_test.go @@ -123,6 +123,7 @@ func TestAdd(t *testing.T) { identifyErr error thingErr error channel []bootstrap.Channel + page mgsdk.ThingsPage listErr error saveErr error err error @@ -209,6 +210,7 @@ func TestAdd(t *testing.T) { sdkCall := sdk.On("Thing", tc.config.ThingID, tc.token).Return(mgsdk.Thing{ID: tc.config.ThingID, Credentials: mgsdk.Credentials{Secret: tc.config.ThingKey}}, errors.NewSDKError(tc.thingErr)) repoCall := boot.On("ListExisting", context.Background(), domainID, mock.Anything).Return(tc.config.Channels, tc.listErr) repoCall1 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, tc.saveErr) + repoCall3 := sdk.On("ThingsByChannel", mock.Anything, mock.Anything, mock.Anything).Return(tc.page, errors.NewSDKError(tc.thingErr)) _, err := svc.Add(context.Background(), tc.token, tc.config) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -232,6 +234,7 @@ func TestAdd(t *testing.T) { sdkCall.Unset() repoCall.Unset() repoCall1.Unset() + repoCall3.Unset() } } @@ -1273,25 +1276,13 @@ func TestChangeState(t *testing.T) { err: bootstrap.ErrThings, event: nil, }, - { - desc: "change state unsuccessfully", - id: config.ThingID, - token: validToken, - userID: validID, - domainID: domainID, - state: bootstrap.Active, - stateErr: svcerr.ErrUpdateEntity, - err: svcerr.ErrUpdateEntity, - event: nil, - }, } lastID := "0" for _, tc := range cases { authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID, DomainId: tc.domainID}, tc.identifyErr) repoCall := boot.On("RetrieveByID", context.Background(), tc.domainID, tc.id).Return(config, tc.retrieveErr) - sdkCall1 := sdk.On("Connect", mock.Anything, mock.Anything).Return(errors.NewSDKError(tc.connectErr)) - repoCall1 := boot.On("ChangeState", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(tc.stateErr) + sdkCall1 := sdk.On("ConnectThing", mock.Anything, mock.Anything, mock.Anything).Return(errors.NewSDKError(tc.connectErr)) err := svc.ChangeState(context.Background(), tc.token, tc.id, tc.state) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -1311,7 +1302,6 @@ func TestChangeState(t *testing.T) { authCall.Unset() sdkCall1.Unset() repoCall.Unset() - repoCall1.Unset() } } From b4c0b27fba9deb25fbce88f665dc3340fc47e557 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 25 Jul 2024 11:40:00 +0300 Subject: [PATCH 7/9] Fix the limit issue Signed-off-by: nyagamunene --- bootstrap/service.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bootstrap/service.go b/bootstrap/service.go index 875514df79..233c5a2527 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -149,7 +149,8 @@ func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (C // Check for existing connection between channel and thing // If it exists set state to acive state := Inactive - pm := mgsdk.PageMetadata{} + limit := uint64(^uint(0)) + pm := mgsdk.PageMetadata{Limit: limit} for _, channel := range cfg.Channels { tp, err := bs.sdk.ThingsByChannel(channel.ID, pm, token) if err != nil { From c322ab2eb4c739c766d9cc83f9b7deb3bb817e6b Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 25 Jul 2024 15:51:59 +0300 Subject: [PATCH 8/9] Revert changes Signed-off-by: nyagamunene --- bootstrap/configs.go | 3 ++ bootstrap/events/producer/streams_test.go | 19 ++++++-- bootstrap/mocks/configs.go | 18 ++++++++ bootstrap/postgres/configs.go | 26 +++++++++++ bootstrap/postgres/configs_test.go | 56 +++++++++++++++++++++++ bootstrap/service.go | 29 ++++-------- bootstrap/service_test.go | 44 ++++++++---------- 7 files changed, 146 insertions(+), 49 deletions(-) diff --git a/bootstrap/configs.go b/bootstrap/configs.go index da93210f02..13333e94cd 100644 --- a/bootstrap/configs.go +++ b/bootstrap/configs.go @@ -94,6 +94,9 @@ type ConfigRepository interface { // by the specified user. Remove(ctx context.Context, domainID, id string) error + // ChangeState changes of the Config, that is owned by the specific user. + ChangeState(ctx context.Context, domainID, id string, state State) error + // ListExisting retrieves those channels from the given list that exist in DB. ListExisting(ctx context.Context, domainID string, ids []string) ([]Channel, error) diff --git a/bootstrap/events/producer/streams_test.go b/bootstrap/events/producer/streams_test.go index 18e6fded9c..8ece934540 100644 --- a/bootstrap/events/producer/streams_test.go +++ b/bootstrap/events/producer/streams_test.go @@ -123,7 +123,6 @@ func TestAdd(t *testing.T) { identifyErr error thingErr error channel []bootstrap.Channel - page mgsdk.ThingsPage listErr error saveErr error err error @@ -210,7 +209,6 @@ func TestAdd(t *testing.T) { sdkCall := sdk.On("Thing", tc.config.ThingID, tc.token).Return(mgsdk.Thing{ID: tc.config.ThingID, Credentials: mgsdk.Credentials{Secret: tc.config.ThingKey}}, errors.NewSDKError(tc.thingErr)) repoCall := boot.On("ListExisting", context.Background(), domainID, mock.Anything).Return(tc.config.Channels, tc.listErr) repoCall1 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, tc.saveErr) - repoCall3 := sdk.On("ThingsByChannel", mock.Anything, mock.Anything, mock.Anything).Return(tc.page, errors.NewSDKError(tc.thingErr)) _, err := svc.Add(context.Background(), tc.token, tc.config) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -234,7 +232,6 @@ func TestAdd(t *testing.T) { sdkCall.Unset() repoCall.Unset() repoCall1.Unset() - repoCall3.Unset() } } @@ -1276,13 +1273,26 @@ func TestChangeState(t *testing.T) { err: bootstrap.ErrThings, event: nil, }, + { + desc: "change state unsuccessfully", + id: config.ThingID, + token: validToken, + userID: validID, + domainID: domainID, + state: bootstrap.Active, + stateErr: svcerr.ErrUpdateEntity, + err: svcerr.ErrUpdateEntity, + event: nil, + }, } lastID := "0" for _, tc := range cases { authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID, DomainId: tc.domainID}, tc.identifyErr) repoCall := boot.On("RetrieveByID", context.Background(), tc.domainID, tc.id).Return(config, tc.retrieveErr) - sdkCall1 := sdk.On("ConnectThing", mock.Anything, mock.Anything, mock.Anything).Return(errors.NewSDKError(tc.connectErr)) + sdkCall1 := sdk.On("Connect", mock.Anything, mock.Anything).Return(errors.NewSDKError(tc.connectErr)) + repoCall1 := boot.On("ChangeState", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(tc.stateErr) + err := svc.ChangeState(context.Background(), tc.token, tc.id, tc.state) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -1302,6 +1312,7 @@ func TestChangeState(t *testing.T) { authCall.Unset() sdkCall1.Unset() repoCall.Unset() + repoCall1.Unset() } } diff --git a/bootstrap/mocks/configs.go b/bootstrap/mocks/configs.go index 875ec20410..d088cb1356 100644 --- a/bootstrap/mocks/configs.go +++ b/bootstrap/mocks/configs.go @@ -17,6 +17,24 @@ type ConfigRepository struct { mock.Mock } +// ChangeState provides a mock function with given fields: ctx, domainID, id, state +func (_m *ConfigRepository) ChangeState(ctx context.Context, domainID string, id string, state bootstrap.State) error { + ret := _m.Called(ctx, domainID, id, state) + + if len(ret) == 0 { + panic("no return value specified for ChangeState") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, bootstrap.State) error); ok { + r0 = rf(ctx, domainID, id, state) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // ConnectThing provides a mock function with given fields: ctx, channelID, thingID func (_m *ConfigRepository) ConnectThing(ctx context.Context, channelID string, thingID string) error { ret := _m.Called(ctx, channelID, thingID) diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index c15e959ca9..7c807eb632 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -362,6 +362,32 @@ func (cr configRepository) Remove(ctx context.Context, domainID, id string) erro return nil } +func (cr configRepository) ChangeState(ctx context.Context, domainID, id string, state bootstrap.State) error { + q := `UPDATE configs SET state = :state WHERE magistrala_thing = :magistrala_thing AND domain_id = :domain_id;` + + dbcfg := dbConfig{ + ThingID: id, + State: state, + DomainID: domainID, + } + + res, err := cr.db.NamedExecContext(ctx, q, dbcfg) + if err != nil { + return errors.Wrap(repoerr.ErrUpdateEntity, err) + } + + cnt, err := res.RowsAffected() + if err != nil { + return errors.Wrap(repoerr.ErrUpdateEntity, err) + } + + if cnt == 0 { + return repoerr.ErrNotFound + } + + return nil +} + func (cr configRepository) ListExisting(ctx context.Context, domainID string, ids []string) ([]bootstrap.Channel, error) { var channels []bootstrap.Channel if len(ids) == 0 { diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index d2b1e7131f..b66a52fcbd 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -521,6 +521,62 @@ func TestRemove(t *testing.T) { } } +func TestChangeState(t *testing.T) { + repo := postgres.NewConfigRepository(db, testLog) + err := deleteChannels(context.Background(), repo) + require.Nil(t, err, "Channels cleanup expected to succeed.") + + c := config + // Use UUID to prevent conflicts. + uid, err := uuid.NewV4() + assert.Nil(t, err, fmt.Sprintf("Got unexpected error: %s.\n", err)) + c.ThingKey = uid.String() + c.ThingID = uid.String() + c.ExternalID = uid.String() + c.ExternalKey = uid.String() + saved, err := repo.Save(context.Background(), c, channels) + assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) + + cases := []struct { + desc string + domainID string + id string + state bootstrap.State + err error + }{ + { + desc: "change state with wrong domain ID ", + id: saved, + domainID: "2", + err: repoerr.ErrNotFound, + }, + { + desc: "change state with wrong id", + id: "wrong", + domainID: c.DomainID, + err: repoerr.ErrNotFound, + }, + { + desc: "change state to Active", + id: saved, + domainID: c.DomainID, + state: bootstrap.Active, + err: nil, + }, + { + desc: "change state to Inactive", + id: saved, + domainID: c.DomainID, + state: bootstrap.Inactive, + err: nil, + }, + } + for _, tc := range cases { + err := repo.ChangeState(context.Background(), tc.domainID, tc.id, tc.state) + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} + func TestListExisting(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) err := deleteChannels(context.Background(), repo) diff --git a/bootstrap/service.go b/bootstrap/service.go index 233c5a2527..947a1ee972 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -146,24 +146,6 @@ func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (C return Config{}, err } - // Check for existing connection between channel and thing - // If it exists set state to acive - state := Inactive - limit := uint64(^uint(0)) - pm := mgsdk.PageMetadata{Limit: limit} - for _, channel := range cfg.Channels { - tp, err := bs.sdk.ThingsByChannel(channel.ID, pm, token) - if err != nil { - return Config{}, errors.Wrap(svcerr.ErrMalformedEntity, err) - } - for _, thing := range tp.Things { - if thing.ID == cfg.ThingID { - state = Active - break - } - } - } - toConnect := bs.toIDList(cfg.Channels) // Check if channels exist. This is the way to prevent fetching channels that already exist. @@ -191,7 +173,7 @@ func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (C cfg.ThingID = mgThing.ID cfg.DomainID = user.GetDomainId() - cfg.State = state + cfg.State = Inactive cfg.ThingKey = mgThing.Credentials.Secret saved, err := bs.configs.Save(ctx, cfg, toConnect) @@ -431,7 +413,11 @@ func (bs bootstrapService) ChangeState(ctx context.Context, token, id string, st switch state { case Active: for _, c := range cfg.Channels { - if err := bs.sdk.ConnectThing(cfg.ThingID, c.ID, token); err != nil { + conIDs := mgsdk.Connection{ + ChannelID: c.ID, + ThingID: cfg.ThingID, + } + if err := bs.sdk.Connect(conIDs, token); err != nil { // Ignore conflict errors as they indicate the connection already exists. if errors.Contains(err, svcerr.ErrConflict) { continue @@ -449,6 +435,9 @@ func (bs bootstrapService) ChangeState(ctx context.Context, token, id string, st } } } + if err := bs.configs.ChangeState(ctx, user.GetDomainId(), id, state); err != nil { + return errors.Wrap(errChangeState, err) + } return nil } diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index 69c11cee0a..54fd2f35b3 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -96,7 +96,6 @@ func TestAdd(t *testing.T) { cases := []struct { desc string config bootstrap.Config - page mgsdk.ThingsPage token string userID string domainID string @@ -120,22 +119,6 @@ func TestAdd(t *testing.T) { authResponse: &magistrala.AuthorizeRes{Authorized: true}, err: nil, }, - { - desc: "add config with an existing connection", - config: config, - page: mgsdk.ThingsPage{ - Things: []mgsdk.Thing{ - { - ID: config.ThingID, - }, - }, - }, - token: validToken, - userID: validID, - domainID: domainID, - authResponse: &magistrala.AuthorizeRes{Authorized: true}, - err: nil, - }, { desc: "add a config with an invalid ID", config: neID, @@ -211,12 +194,12 @@ func TestAdd(t *testing.T) { for _, tc := range cases { authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID, DomainId: tc.domainID}, tc.identifyErr) authCall1 := auth.On("Authorize", context.Background(), mock.Anything).Return(tc.authResponse, tc.authorizeErr) - repoCall := sdk.On("ThingsByChannel", mock.Anything, mock.Anything, mock.Anything).Return(tc.page, tc.thingErr) - repoCall1 := sdk.On("Thing", tc.config.ThingID, tc.token).Return(mgsdk.Thing{ID: tc.config.ThingID, Credentials: mgsdk.Credentials{Secret: tc.config.ThingKey}}, tc.thingErr) - repoCall2 := sdk.On("CreateThing", mock.Anything, tc.token).Return(mgsdk.Thing{}, tc.createThingErr) - repoCall3 := sdk.On("DeleteThing", tc.config.ThingID, tc.token).Return(tc.deleteThingErr) - repoCall4 := boot.On("ListExisting", context.Background(), tc.domainID, mock.Anything).Return(tc.config.Channels, tc.listExistingErr) - repoCall5 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, tc.saveErr) + + repoCall := sdk.On("Thing", tc.config.ThingID, tc.token).Return(mgsdk.Thing{ID: tc.config.ThingID, Credentials: mgsdk.Credentials{Secret: tc.config.ThingKey}}, tc.thingErr) + repoCall1 := sdk.On("CreateThing", mock.Anything, tc.token).Return(mgsdk.Thing{}, tc.createThingErr) + repoCall2 := sdk.On("DeleteThing", tc.config.ThingID, tc.token).Return(tc.deleteThingErr) + repoCall3 := boot.On("ListExisting", context.Background(), tc.domainID, mock.Anything).Return(tc.config.Channels, tc.listExistingErr) + repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, tc.saveErr) _, err := c.Add(context.Background(), tc.token, tc.config) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -228,7 +211,6 @@ func TestAdd(t *testing.T) { repoCall2.Unset() repoCall3.Unset() repoCall4.Unset() - repoCall5.Unset() } } @@ -1223,18 +1205,30 @@ func TestChangeState(t *testing.T) { connectErr: errors.NewSDKError(bootstrap.ErrThings), err: bootstrap.ErrThings, }, + { + desc: "change state with invalid state", + state: bootstrap.State(2), + id: c.ThingID, + token: validToken, + userID: validID, + domainID: domainID, + stateErr: svcerr.ErrMalformedEntity, + err: svcerr.ErrMalformedEntity, + }, } for _, tc := range cases { authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID, DomainId: tc.domainID}, tc.identifyErr) repoCall := boot.On("RetrieveByID", context.Background(), tc.domainID, tc.id).Return(c, tc.retrieveErr) - sdkCall := sdk.On("ConnectThing", mock.Anything, mock.Anything, mock.Anything).Return(tc.connectErr) + sdkCall := sdk.On("Connect", mock.Anything, mock.Anything).Return(tc.connectErr) + repoCall1 := boot.On("ChangeState", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(tc.stateErr) err := svc.ChangeState(context.Background(), tc.token, tc.id, tc.state) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) authCall.Unset() sdkCall.Unset() repoCall.Unset() + repoCall1.Unset() } } From 358cb9ad75fdbfbe5624f60b74eac047f1336260 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 25 Jul 2024 15:55:51 +0300 Subject: [PATCH 9/9] Remove trailing empty spaces Signed-off-by: nyagamunene --- bootstrap/events/producer/streams_test.go | 1 - bootstrap/service.go | 1 + bootstrap/service_test.go | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/bootstrap/events/producer/streams_test.go b/bootstrap/events/producer/streams_test.go index 8ece934540..759c689a2d 100644 --- a/bootstrap/events/producer/streams_test.go +++ b/bootstrap/events/producer/streams_test.go @@ -1292,7 +1292,6 @@ func TestChangeState(t *testing.T) { repoCall := boot.On("RetrieveByID", context.Background(), tc.domainID, tc.id).Return(config, tc.retrieveErr) sdkCall1 := sdk.On("Connect", mock.Anything, mock.Anything).Return(errors.NewSDKError(tc.connectErr)) repoCall1 := boot.On("ChangeState", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(tc.stateErr) - err := svc.ChangeState(context.Background(), tc.token, tc.id, tc.state) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) diff --git a/bootstrap/service.go b/bootstrap/service.go index 947a1ee972..e3a713625d 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -410,6 +410,7 @@ func (bs bootstrapService) ChangeState(ctx context.Context, token, id string, st if cfg.State == state { return nil } + switch state { case Active: for _, c := range cfg.Channels { diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index 54fd2f35b3..63960a841f 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -194,7 +194,6 @@ func TestAdd(t *testing.T) { for _, tc := range cases { authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID, DomainId: tc.domainID}, tc.identifyErr) authCall1 := auth.On("Authorize", context.Background(), mock.Anything).Return(tc.authResponse, tc.authorizeErr) - repoCall := sdk.On("Thing", tc.config.ThingID, tc.token).Return(mgsdk.Thing{ID: tc.config.ThingID, Credentials: mgsdk.Credentials{Secret: tc.config.ThingKey}}, tc.thingErr) repoCall1 := sdk.On("CreateThing", mock.Anything, tc.token).Return(mgsdk.Thing{}, tc.createThingErr) repoCall2 := sdk.On("DeleteThing", tc.config.ThingID, tc.token).Return(tc.deleteThingErr)