From 8af3e839b1e9b182fce5ad31103c16d324ec4c40 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Tue, 31 Dec 2024 12:39:36 -0300 Subject: [PATCH] Start publishing events --- .mockery.yaml | 7 + pkg/api/server.go | 2 +- pkg/identity/api/v1/identity_service.go | 60 ++- pkg/identity/api/v1/identity_service_test.go | 39 +- pkg/identity/types/publish-result.go | 56 +++ pkg/mls/store/store.go | 28 +- ...tyApi_SubscribeAssociationChangesServer.go | 350 ++++++++++++++++++ 7 files changed, 526 insertions(+), 16 deletions(-) create mode 100644 pkg/identity/types/publish-result.go create mode 100644 pkg/mocks/mock_IdentityApi_SubscribeAssociationChangesServer.go diff --git a/.mockery.yaml b/.mockery.yaml index c21bd77d..1eb74428 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -1,5 +1,12 @@ with-expecter: true packages: + github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1: + config: + dir: ./pkg/mocks + outpkg: mocks + interfaces: + IdentityApi_SubscribeAssociationChangesServer: + config: github.com/xmtp/xmtp-node-go/pkg/proto/mls/api/v1: config: dir: ./pkg/mocks diff --git a/pkg/api/server.go b/pkg/api/server.go index 93f60f30..8224197d 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -175,7 +175,7 @@ func (s *Server) startGRPC() error { } mlsv1pb.RegisterMlsApiServer(grpcServer, s.mlsv1) - s.identityv1, err = identityv1.NewService(s.Log, s.Config.MLSStore, s.Config.MLSValidator, s.natsServer) + s.identityv1, err = identityv1.NewService(s.Log, s.Config.MLSStore, s.Config.MLSValidator, s.natsServer, publishToWakuRelay) if err != nil { return errors.Wrap(err, "creating identity service") } diff --git a/pkg/identity/api/v1/identity_service.go b/pkg/identity/api/v1/identity_service.go index 2deff489..189fdd6f 100644 --- a/pkg/identity/api/v1/identity_service.go +++ b/pkg/identity/api/v1/identity_service.go @@ -5,7 +5,9 @@ import ( "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/xmtp/xmtp-node-go/pkg/envelopes" + identityTypes "github.com/xmtp/xmtp-node-go/pkg/identity/types" mlsstore "github.com/xmtp/xmtp-node-go/pkg/mls/store" "github.com/xmtp/xmtp-node-go/pkg/mlsvalidate" api "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1" @@ -24,16 +26,24 @@ type Service struct { store mlsstore.MlsStore validationService mlsvalidate.MLSValidationService - ctx context.Context - nc *nats.Conn - ctxCancel func() + ctx context.Context + nc *nats.Conn + publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error + ctxCancel func() } -func NewService(log *zap.Logger, store mlsstore.MlsStore, validationService mlsvalidate.MLSValidationService, natsServer *server.Server) (s *Service, err error) { +func NewService( + log *zap.Logger, + store mlsstore.MlsStore, + validationService mlsvalidate.MLSValidationService, + natsServer *server.Server, + publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error, +) (s *Service, err error) { s = &Service{ - log: log.Named("identity"), - store: store, - validationService: validationService, + log: log.Named("identity"), + store: store, + validationService: validationService, + publishToWakuRelay: publishToWakuRelay, } s.ctx, s.ctxCancel = context.WithCancel(context.Background()) @@ -90,7 +100,17 @@ Start transaction (SERIALIZABLE isolation level) End transaction */ func (s *Service) PublishIdentityUpdate(ctx context.Context, req *api.PublishIdentityUpdateRequest) (*api.PublishIdentityUpdateResponse, error) { - return s.store.PublishIdentityUpdate(ctx, req, s.validationService) + res, err := s.store.PublishIdentityUpdate(ctx, req, s.validationService) + if err != nil { + return nil, err + } + + if err = s.PublishAssociationChangesEvent(ctx, res); err != nil { + s.log.Error("error publishing association changes event", zap.Error(err)) + // Don't return the erro here because the transaction has already been committed + } + + return &api.PublishIdentityUpdateResponse{}, nil } func (s *Service) GetIdentityUpdates(ctx context.Context, req *api.GetIdentityUpdatesRequest) (*api.GetIdentityUpdatesResponse, error) { @@ -145,6 +165,30 @@ func (s *Service) SubscribeAssociationChanges(req *identity.SubscribeAssociation return nil } +func (s *Service) PublishAssociationChangesEvent(ctx context.Context, identityUpdateResult *identityTypes.PublishIdentityUpdateResult) error { + protoEvents := identityUpdateResult.GetChanges() + if len(protoEvents) == 0 { + return nil + } + + for _, protoEvent := range protoEvents { + msgBytes, err := pb.Marshal(protoEvent) + if err != nil { + return err + } + + if err = s.publishToWakuRelay(ctx, &wakupb.WakuMessage{ + ContentTopic: topic.AssociationChangedTopic, + Timestamp: int64(identityUpdateResult.TimestampNs), + Payload: msgBytes, + }); err != nil { + return err + } + } + + return nil +} + func buildNatsSubjectForAssociationChanges() string { return envelopes.BuildNatsSubject(topic.BuildAssociationChangedTopic()) } diff --git a/pkg/identity/api/v1/identity_service_test.go b/pkg/identity/api/v1/identity_service_test.go index bbede814..5873588b 100644 --- a/pkg/identity/api/v1/identity_service_test.go +++ b/pkg/identity/api/v1/identity_service_test.go @@ -2,6 +2,7 @@ package api import ( "context" + "fmt" "testing" "time" @@ -9,14 +10,18 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/uptrace/bun" + wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb" mlsstore "github.com/xmtp/xmtp-node-go/pkg/mls/store" "github.com/xmtp/xmtp-node-go/pkg/mlsvalidate" identity "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1" associations "github.com/xmtp/xmtp-node-go/pkg/proto/identity/associations" mlsv1 "github.com/xmtp/xmtp-node-go/pkg/proto/mls/api/v1" test "github.com/xmtp/xmtp-node-go/pkg/testing" + pb "google.golang.org/protobuf/proto" ) +const INBOX_ID = "test_inbox" + type mockedMLSValidationService struct { mock.Mock } @@ -39,7 +44,7 @@ func (m *mockedMLSValidationService) GetAssociationState(ctx context.Context, ol new_members = append(new_members, &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "0x03"}}) out := mlsvalidate.AssociationStateResult{ - AssociationState: &associations.AssociationState{InboxId: "test_inbox", Members: member_map, RecoveryAddress: "recovery", SeenSignatures: [][]byte{[]byte("seen"), []byte("sig")}}, + AssociationState: &associations.AssociationState{InboxId: INBOX_ID, Members: member_map, RecoveryAddress: "recovery", SeenSignatures: [][]byte{[]byte("seen"), []byte("sig")}}, StateDiff: &associations.AssociationStateDiff{NewMembers: new_members, RemovedMembers: nil}, } return &out, nil @@ -83,10 +88,12 @@ func newTestService(t *testing.T, ctx context.Context) (*Service, *bun.DB, func( if !natsServer.ReadyForConnections(4 * time.Second) { t.Fail() } - require.NoError(t, err) mlsValidationService := newMockedValidationService() + publishToWakuRelay := func(ctx context.Context, msg *wakupb.WakuMessage) error { + return nil + } - svc, err := NewService(log, store, mlsValidationService, natsServer) + svc, err := NewService(log, store, mlsValidationService, natsServer, publishToWakuRelay) require.NoError(t, err) return svc, db, func() { @@ -271,3 +278,29 @@ func TestInboxSizeLimit(t *testing.T) { require.Equal(t, res.Responses[0].InboxId, inbox_id) require.Len(t, res.Responses[0].Updates, 256) } + +func TestPublishAssociationChanges(t *testing.T) { + ctx := context.Background() + svc, _, cleanup := newTestService(t, ctx) + defer cleanup() + + inboxId := test.RandomInboxId() + address := "test_address" + + numPublishedToWaku := 0 + svc.publishToWakuRelay = func(ctx context.Context, wakuMsg *wakupb.WakuMessage) error { + numPublishedToWaku++ + var msg identity.SubscribeAssociationChangesResponse + err := pb.Unmarshal(wakuMsg.Payload, &msg) + require.NoError(t, err) + require.Equal(t, msg.GetAccountAddressAssociation().InboxId, inboxId) + require.Equal(t, msg.GetAccountAddressAssociation().AccountAddress, fmt.Sprintf("0x0%d", numPublishedToWaku)) + return nil + } + + _, err := svc.PublishIdentityUpdate(ctx, publishIdentityUpdateRequest(inboxId, makeCreateInbox(address))) + require.NoError(t, err) + + // The mocked GetAssociationState always returns 3 new addresses + require.Equal(t, numPublishedToWaku, 3) +} diff --git a/pkg/identity/types/publish-result.go b/pkg/identity/types/publish-result.go new file mode 100644 index 00000000..8fc7bcc8 --- /dev/null +++ b/pkg/identity/types/publish-result.go @@ -0,0 +1,56 @@ +package types + +import ( + identity "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1" +) + +type PublishIdentityUpdateResult struct { + InboxID string + + NewAddresses []string + RevokedAddresses []string + NewInstallations [][]byte + RevokedInstallations [][]byte + TimestampNs uint64 +} + +func NewPublishIdentityUpdateResult(inboxID string, timestampNs uint64, newAddresses []string, revokedAddresses []string, newInstallations [][]byte, revokedInstallations [][]byte) *PublishIdentityUpdateResult { + return &PublishIdentityUpdateResult{ + InboxID: inboxID, + TimestampNs: timestampNs, + NewAddresses: newAddresses, + RevokedAddresses: revokedAddresses, + NewInstallations: newInstallations, + RevokedInstallations: revokedInstallations, + } +} + +func (p *PublishIdentityUpdateResult) GetChanges() []*identity.SubscribeAssociationChangesResponse { + out := make([]*identity.SubscribeAssociationChangesResponse, 0) + + for _, newAddress := range p.NewAddresses { + out = append(out, &identity.SubscribeAssociationChangesResponse{ + TimestampNs: p.TimestampNs, + Change: &identity.SubscribeAssociationChangesResponse_AccountAddressAssociation_{ + AccountAddressAssociation: &identity.SubscribeAssociationChangesResponse_AccountAddressAssociation{ + InboxId: p.InboxID, + AccountAddress: newAddress, + }, + }, + }) + } + + for _, revokedAddress := range p.RevokedAddresses { + out = append(out, &identity.SubscribeAssociationChangesResponse{ + TimestampNs: p.TimestampNs, + Change: &identity.SubscribeAssociationChangesResponse_AccountAddressRevocation_{ + AccountAddressRevocation: &identity.SubscribeAssociationChangesResponse_AccountAddressRevocation{ + InboxId: p.InboxID, + AccountAddress: revokedAddress, + }, + }, + }) + } + + return out +} diff --git a/pkg/mls/store/store.go b/pkg/mls/store/store.go index 75cfec87..bd016a94 100644 --- a/pkg/mls/store/store.go +++ b/pkg/mls/store/store.go @@ -10,6 +10,7 @@ import ( "github.com/uptrace/bun" "github.com/uptrace/bun/migrate" + identityTypes "github.com/xmtp/xmtp-node-go/pkg/identity/types" migrations "github.com/xmtp/xmtp-node-go/pkg/migrations/mls" queries "github.com/xmtp/xmtp-node-go/pkg/mls/store/queries" "github.com/xmtp/xmtp-node-go/pkg/mlsvalidate" @@ -31,7 +32,7 @@ type Store struct { } type IdentityStore interface { - PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService) (*identity.PublishIdentityUpdateResponse, error) + PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService) (*identityTypes.PublishIdentityUpdateResult, error) GetInboxLogs(ctx context.Context, req *identity.GetIdentityUpdatesRequest) (*identity.GetIdentityUpdatesResponse, error) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error) } @@ -99,13 +100,21 @@ func (s *Store) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsReques }, nil } -func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService) (*identity.PublishIdentityUpdateResponse, error) { +func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService) (*identityTypes.PublishIdentityUpdateResult, error) { newUpdate := req.GetIdentityUpdate() if newUpdate == nil { return nil, errors.New("IdentityUpdate is required") } + now := nowNs() + var newAccountAddresses []string + var revokedAccountAddresses []string + if err := s.RunInRepeatableReadTx(ctx, 3, func(ctx context.Context, txQueries *queries.Queries) error { + // Reset these lists to allow for safe retries of the TX + newAccountAddresses = make([]string, 0) + revokedAccountAddresses = make([]string, 0) + inboxId := newUpdate.GetInboxId() // We use a pg_advisory_lock to lock the inbox_id instead of SELECT FOR UPDATE // This allows the lock to be enforced even when there are no existing `inbox_log`s @@ -144,7 +153,7 @@ func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.Publish sequence_id, err := txQueries.InsertInboxLog(ctx, queries.InsertInboxLogParams{ InboxID: inboxId, - ServerTimestampNs: nowNs(), + ServerTimestampNs: now, IdentityUpdateProto: protoBytes, }) @@ -157,6 +166,7 @@ func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.Publish for _, new_member := range state.StateDiff.NewMembers { log.Info("New member", zap.Any("member", new_member)) if address, ok := new_member.Kind.(*associations.MemberIdentifier_Address); ok { + newAccountAddresses = append(newAccountAddresses, address.Address) _, err = txQueries.InsertAddressLog(ctx, queries.InsertAddressLogParams{ Address: address.Address, InboxID: inboxId, @@ -172,6 +182,7 @@ func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.Publish for _, removed_member := range state.StateDiff.RemovedMembers { log.Info("Removed member", zap.Any("member", removed_member)) if address, ok := removed_member.Kind.(*associations.MemberIdentifier_Address); ok { + revokedAccountAddresses = append(revokedAccountAddresses, address.Address) err = txQueries.RevokeAddressFromLog(ctx, queries.RevokeAddressFromLogParams{ Address: address.Address, InboxID: inboxId, @@ -188,7 +199,16 @@ func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.Publish return nil, err } - return &identity.PublishIdentityUpdateResponse{}, nil + result := identityTypes.NewPublishIdentityUpdateResult( + req.IdentityUpdate.InboxId, + uint64(now), + newAccountAddresses, + revokedAccountAddresses, + [][]byte{}, // TODO: Handle installations added + [][]byte{}, // TODO: Handle installations revoked + ) + + return result, nil } func (s *Store) GetInboxLogs(ctx context.Context, batched_req *identity.GetIdentityUpdatesRequest) (*identity.GetIdentityUpdatesResponse, error) { diff --git a/pkg/mocks/mock_IdentityApi_SubscribeAssociationChangesServer.go b/pkg/mocks/mock_IdentityApi_SubscribeAssociationChangesServer.go new file mode 100644 index 00000000..feacab31 --- /dev/null +++ b/pkg/mocks/mock_IdentityApi_SubscribeAssociationChangesServer.go @@ -0,0 +1,350 @@ +// Code generated by mockery v2.44.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + apiv1 "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1" + + metadata "google.golang.org/grpc/metadata" + + mock "github.com/stretchr/testify/mock" +) + +// MockIdentityApi_SubscribeAssociationChangesServer is an autogenerated mock type for the IdentityApi_SubscribeAssociationChangesServer type +type MockIdentityApi_SubscribeAssociationChangesServer struct { + mock.Mock +} + +type MockIdentityApi_SubscribeAssociationChangesServer_Expecter struct { + mock *mock.Mock +} + +func (_m *MockIdentityApi_SubscribeAssociationChangesServer) EXPECT() *MockIdentityApi_SubscribeAssociationChangesServer_Expecter { + return &MockIdentityApi_SubscribeAssociationChangesServer_Expecter{mock: &_m.Mock} +} + +// Context provides a mock function with given fields: +func (_m *MockIdentityApi_SubscribeAssociationChangesServer) Context() context.Context { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Context") + } + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// MockIdentityApi_SubscribeAssociationChangesServer_Context_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Context' +type MockIdentityApi_SubscribeAssociationChangesServer_Context_Call struct { + *mock.Call +} + +// Context is a helper method to define mock.On call +func (_e *MockIdentityApi_SubscribeAssociationChangesServer_Expecter) Context() *MockIdentityApi_SubscribeAssociationChangesServer_Context_Call { + return &MockIdentityApi_SubscribeAssociationChangesServer_Context_Call{Call: _e.mock.On("Context")} +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_Context_Call) Run(run func()) *MockIdentityApi_SubscribeAssociationChangesServer_Context_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_Context_Call) Return(_a0 context.Context) *MockIdentityApi_SubscribeAssociationChangesServer_Context_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_Context_Call) RunAndReturn(run func() context.Context) *MockIdentityApi_SubscribeAssociationChangesServer_Context_Call { + _c.Call.Return(run) + return _c +} + +// RecvMsg provides a mock function with given fields: m +func (_m *MockIdentityApi_SubscribeAssociationChangesServer) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + if len(ret) == 0 { + panic("no return value specified for RecvMsg") + } + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecvMsg' +type MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call struct { + *mock.Call +} + +// RecvMsg is a helper method to define mock.On call +// - m interface{} +func (_e *MockIdentityApi_SubscribeAssociationChangesServer_Expecter) RecvMsg(m interface{}) *MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call { + return &MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call{Call: _e.mock.On("RecvMsg", m)} +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call) Run(run func(m interface{})) *MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(interface{})) + }) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call) Return(_a0 error) *MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call) RunAndReturn(run func(interface{}) error) *MockIdentityApi_SubscribeAssociationChangesServer_RecvMsg_Call { + _c.Call.Return(run) + return _c +} + +// Send provides a mock function with given fields: _a0 +func (_m *MockIdentityApi_SubscribeAssociationChangesServer) Send(_a0 *apiv1.SubscribeAssociationChangesResponse) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for Send") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*apiv1.SubscribeAssociationChangesResponse) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIdentityApi_SubscribeAssociationChangesServer_Send_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Send' +type MockIdentityApi_SubscribeAssociationChangesServer_Send_Call struct { + *mock.Call +} + +// Send is a helper method to define mock.On call +// - _a0 *apiv1.SubscribeAssociationChangesResponse +func (_e *MockIdentityApi_SubscribeAssociationChangesServer_Expecter) Send(_a0 interface{}) *MockIdentityApi_SubscribeAssociationChangesServer_Send_Call { + return &MockIdentityApi_SubscribeAssociationChangesServer_Send_Call{Call: _e.mock.On("Send", _a0)} +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_Send_Call) Run(run func(_a0 *apiv1.SubscribeAssociationChangesResponse)) *MockIdentityApi_SubscribeAssociationChangesServer_Send_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*apiv1.SubscribeAssociationChangesResponse)) + }) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_Send_Call) Return(_a0 error) *MockIdentityApi_SubscribeAssociationChangesServer_Send_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_Send_Call) RunAndReturn(run func(*apiv1.SubscribeAssociationChangesResponse) error) *MockIdentityApi_SubscribeAssociationChangesServer_Send_Call { + _c.Call.Return(run) + return _c +} + +// SendHeader provides a mock function with given fields: _a0 +func (_m *MockIdentityApi_SubscribeAssociationChangesServer) SendHeader(_a0 metadata.MD) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for SendHeader") + } + + var r0 error + if rf, ok := ret.Get(0).(func(metadata.MD) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendHeader' +type MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call struct { + *mock.Call +} + +// SendHeader is a helper method to define mock.On call +// - _a0 metadata.MD +func (_e *MockIdentityApi_SubscribeAssociationChangesServer_Expecter) SendHeader(_a0 interface{}) *MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call { + return &MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call{Call: _e.mock.On("SendHeader", _a0)} +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call) Run(run func(_a0 metadata.MD)) *MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(metadata.MD)) + }) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call) Return(_a0 error) *MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call) RunAndReturn(run func(metadata.MD) error) *MockIdentityApi_SubscribeAssociationChangesServer_SendHeader_Call { + _c.Call.Return(run) + return _c +} + +// SendMsg provides a mock function with given fields: m +func (_m *MockIdentityApi_SubscribeAssociationChangesServer) SendMsg(m interface{}) error { + ret := _m.Called(m) + + if len(ret) == 0 { + panic("no return value specified for SendMsg") + } + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendMsg' +type MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call struct { + *mock.Call +} + +// SendMsg is a helper method to define mock.On call +// - m interface{} +func (_e *MockIdentityApi_SubscribeAssociationChangesServer_Expecter) SendMsg(m interface{}) *MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call { + return &MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call{Call: _e.mock.On("SendMsg", m)} +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call) Run(run func(m interface{})) *MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(interface{})) + }) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call) Return(_a0 error) *MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call) RunAndReturn(run func(interface{}) error) *MockIdentityApi_SubscribeAssociationChangesServer_SendMsg_Call { + _c.Call.Return(run) + return _c +} + +// SetHeader provides a mock function with given fields: _a0 +func (_m *MockIdentityApi_SubscribeAssociationChangesServer) SetHeader(_a0 metadata.MD) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for SetHeader") + } + + var r0 error + if rf, ok := ret.Get(0).(func(metadata.MD) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetHeader' +type MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call struct { + *mock.Call +} + +// SetHeader is a helper method to define mock.On call +// - _a0 metadata.MD +func (_e *MockIdentityApi_SubscribeAssociationChangesServer_Expecter) SetHeader(_a0 interface{}) *MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call { + return &MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call{Call: _e.mock.On("SetHeader", _a0)} +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call) Run(run func(_a0 metadata.MD)) *MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(metadata.MD)) + }) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call) Return(_a0 error) *MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call) RunAndReturn(run func(metadata.MD) error) *MockIdentityApi_SubscribeAssociationChangesServer_SetHeader_Call { + _c.Call.Return(run) + return _c +} + +// SetTrailer provides a mock function with given fields: _a0 +func (_m *MockIdentityApi_SubscribeAssociationChangesServer) SetTrailer(_a0 metadata.MD) { + _m.Called(_a0) +} + +// MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetTrailer' +type MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call struct { + *mock.Call +} + +// SetTrailer is a helper method to define mock.On call +// - _a0 metadata.MD +func (_e *MockIdentityApi_SubscribeAssociationChangesServer_Expecter) SetTrailer(_a0 interface{}) *MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call { + return &MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call{Call: _e.mock.On("SetTrailer", _a0)} +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call) Run(run func(_a0 metadata.MD)) *MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(metadata.MD)) + }) + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call) Return() *MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call { + _c.Call.Return() + return _c +} + +func (_c *MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call) RunAndReturn(run func(metadata.MD)) *MockIdentityApi_SubscribeAssociationChangesServer_SetTrailer_Call { + _c.Call.Return(run) + return _c +} + +// NewMockIdentityApi_SubscribeAssociationChangesServer creates a new instance of MockIdentityApi_SubscribeAssociationChangesServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockIdentityApi_SubscribeAssociationChangesServer(t interface { + mock.TestingT + Cleanup(func()) +}) *MockIdentityApi_SubscribeAssociationChangesServer { + mock := &MockIdentityApi_SubscribeAssociationChangesServer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}