diff --git a/go.mod b/go.mod index d63a5e7e..17cfd010 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/uptrace/bun/driver/pgdriver v1.1.16 github.com/waku-org/go-waku v0.8.0 github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3 - github.com/xmtp/proto/v3 v3.36.3-0.20240111115707-f2e60758fa55 + github.com/xmtp/proto/v3 v3.36.3-0.20240111120253-eacdbc385fcb github.com/yoheimuta/protolint v0.39.0 go.uber.org/zap v1.24.0 golang.org/x/sync v0.3.0 diff --git a/go.sum b/go.sum index 1988e8bd..f108932a 100644 --- a/go.sum +++ b/go.sum @@ -1146,8 +1146,8 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0 github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3 h1:wzUffJGCTBGXIDyNU+1UBu1fn2Nzo+OQzM1pLrheh58= github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3/go.mod h1:bJREWk+NDnZYjgLQdAi8SUWuq/5pkMme4GqiffEhUF4= -github.com/xmtp/proto/v3 v3.36.3-0.20240111115707-f2e60758fa55 h1:EyaL20B1UNMLJ+sCGWLNlUukyO/x01g5LtHL9KDGutU= -github.com/xmtp/proto/v3 v3.36.3-0.20240111115707-f2e60758fa55/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY= +github.com/xmtp/proto/v3 v3.36.3-0.20240111120253-eacdbc385fcb h1:kb8GdYfxYun1UPbMLA1P9/mG9hxj9dJH+gCROivBTJw= +github.com/xmtp/proto/v3 v3.36.3-0.20240111120253-eacdbc385fcb/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/yoheimuta/go-protoparser/v4 v4.6.0 h1:uvz1e9/5Ihsm4Ku8AJeDImTpirKmIxubZdSn0QJNdnw= github.com/yoheimuta/go-protoparser/v4 v4.6.0/go.mod h1:AHNNnSWnb0UoL4QgHPiOAg2BniQceFscPI5X/BZNHl8= diff --git a/pkg/mls/api/v1/service.go b/pkg/mls/api/v1/service.go index 8faeefb0..62e30a3c 100644 --- a/pkg/mls/api/v1/service.go +++ b/pkg/mls/api/v1/service.go @@ -6,7 +6,6 @@ import ( wakunode "github.com/waku-org/go-waku/waku/v2/node" wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb" proto "github.com/xmtp/proto/v3/go/mls/api/v1" - "github.com/xmtp/proto/v3/go/mls/message_contents" mlsstore "github.com/xmtp/xmtp-node-go/pkg/mls/store" "github.com/xmtp/xmtp-node-go/pkg/mlsvalidate" "github.com/xmtp/xmtp-node-go/pkg/topic" @@ -222,58 +221,11 @@ func (s *Service) SendWelcomeMessages(ctx context.Context, req *proto.SendWelcom } func (s *Service) QueryGroupMessages(ctx context.Context, req *proto.QueryGroupMessagesRequest) (*proto.QueryGroupMessagesResponse, error) { - msgs, err := s.store.QueryGroupMessages(ctx, req) - if err != nil { - return nil, err - } - - messages := make([]*message_contents.GroupMessage, 0, len(msgs)) - for _, msg := range msgs { - messages = append(messages, &message_contents.GroupMessage{ - Version: &message_contents.GroupMessage_V1_{ - V1: &message_contents.GroupMessage_V1{ - Id: msg.Id, - CreatedNs: uint64(msg.CreatedAt.UnixNano()), - GroupId: msg.GroupId, - Data: msg.Data, - }, - }, - }) - } - - // TODO(snormore): build and return paging info - - return &proto.QueryGroupMessagesResponse{ - Messages: messages, - PagingInfo: nil, - }, nil + return s.store.QueryGroupMessagesV1(ctx, req) } func (s *Service) QueryWelcomeMessages(ctx context.Context, req *proto.QueryWelcomeMessagesRequest) (*proto.QueryWelcomeMessagesResponse, error) { - msgs, err := s.store.QueryWelcomeMessages(ctx, req) - if err != nil { - return nil, err - } - - messages := make([]*message_contents.WelcomeMessage, 0, len(msgs)) - for _, msg := range msgs { - messages = append(messages, &message_contents.WelcomeMessage{ - Version: &message_contents.WelcomeMessage_V1_{ - V1: &message_contents.WelcomeMessage_V1{ - Id: msg.Id, - CreatedNs: uint64(msg.CreatedAt.UnixNano()), - Data: msg.Data, - }, - }, - }) - } - - // TODO(snormore): build and return paging info - - return &proto.QueryWelcomeMessagesResponse{ - Messages: messages, - PagingInfo: nil, - }, nil + return s.store.QueryWelcomeMessagesV1(ctx, req) } func buildIdentityUpdate(update mlsstore.IdentityUpdate) *proto.GetIdentityUpdatesResponse_Update { diff --git a/pkg/mls/api/v1/service_test.go b/pkg/mls/api/v1/service_test.go index 002b6ce7..07bdff55 100644 --- a/pkg/mls/api/v1/service_test.go +++ b/pkg/mls/api/v1/service_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/uptrace/bun" mlsv1 "github.com/xmtp/proto/v3/go/mls/api/v1" - messageContents "github.com/xmtp/proto/v3/go/mls/message_contents" + "github.com/xmtp/proto/v3/go/mls/message_contents" mlsstore "github.com/xmtp/xmtp-node-go/pkg/mls/store" "github.com/xmtp/xmtp-node-go/pkg/mlsvalidate" test "github.com/xmtp/xmtp-node-go/pkg/testing" @@ -230,9 +230,9 @@ func TestSendGroupMessages(t *testing.T) { mlsValidationService.mockValidateGroupMessages(groupId) _, err := svc.SendGroupMessages(ctx, &mlsv1.SendGroupMessagesRequest{ - Messages: []*messageContents.GroupMessage{{ - Version: &messageContents.GroupMessage_V1_{ - V1: &messageContents.GroupMessage_V1{ + Messages: []*message_contents.GroupMessage{{ + Version: &message_contents.GroupMessage_V1_{ + V1: &message_contents.GroupMessage_V1{ Id: 1, CreatedNs: 1, GroupId: "group", @@ -243,13 +243,47 @@ func TestSendGroupMessages(t *testing.T) { }) require.NoError(t, err) - msgs, err := svc.store.QueryGroupMessages(ctx, &mlsv1.QueryGroupMessagesRequest{ + resp, err := svc.store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ GroupId: groupId, }) require.NoError(t, err) - require.Len(t, msgs, 1) - require.Equal(t, msgs[0].Data, []byte("test")) - require.NotNil(t, msgs[0].CreatedAt) + require.Len(t, resp.Messages, 1) + require.Equal(t, resp.Messages[0].GetV1().Data, []byte("test")) + require.NotEmpty(t, resp.Messages[0].GetV1().CreatedNs) +} + +func TestSendWelcomeMessages(t *testing.T) { + ctx := context.Background() + svc, _, _, cleanup := newTestService(t, ctx) + defer cleanup() + + installationId := test.RandomString(32) + + _, err := svc.SendWelcomeMessages(ctx, &mlsv1.SendWelcomeMessagesRequest{ + WelcomeMessages: []*mlsv1.SendWelcomeMessagesRequest_WelcomeMessageRequest{ + { + InstallationId: []byte(installationId), + WelcomeMessage: &message_contents.WelcomeMessage{ + Version: &message_contents.WelcomeMessage_V1_{ + V1: &message_contents.WelcomeMessage_V1{ + Id: 1, + CreatedNs: 1, + Data: []byte("test"), + }, + }, + }, + }, + }, + }) + require.NoError(t, err) + + resp, err := svc.store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: installationId, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 1) + require.Equal(t, resp.Messages[0].GetV1().Data, []byte("test")) + require.NotEmpty(t, resp.Messages[0].GetV1().CreatedNs) } func TestGetIdentityUpdates(t *testing.T) { diff --git a/pkg/mls/store/store.go b/pkg/mls/store/store.go index 4f12ed1c..e4067d54 100644 --- a/pkg/mls/store/store.go +++ b/pkg/mls/store/store.go @@ -9,6 +9,7 @@ import ( "github.com/uptrace/bun" "github.com/uptrace/bun/migrate" mlsv1 "github.com/xmtp/proto/v3/go/mls/api/v1" + "github.com/xmtp/proto/v3/go/mls/message_contents" migrations "github.com/xmtp/xmtp-node-go/pkg/migrations/mls" "go.uber.org/zap" ) @@ -27,9 +28,9 @@ type MlsStore interface { FetchKeyPackages(ctx context.Context, installationIds [][]byte) ([]*Installation, error) GetIdentityUpdates(ctx context.Context, walletAddresses []string, startTimeNs int64) (map[string]IdentityUpdateList, error) InsertGroupMessage(ctx context.Context, groupId string, data []byte) (*GroupMessage, error) - QueryGroupMessages(ctx context.Context, query *mlsv1.QueryGroupMessagesRequest) ([]*GroupMessage, error) InsertWelcomeMessage(ctx context.Context, installationId string, data []byte) (*WelcomeMessage, error) - QueryWelcomeMessages(ctx context.Context, query *mlsv1.QueryWelcomeMessagesRequest) ([]*WelcomeMessage, error) + QueryGroupMessagesV1(ctx context.Context, query *mlsv1.QueryGroupMessagesRequest) (*mlsv1.QueryGroupMessagesResponse, error) + QueryWelcomeMessagesV1(ctx context.Context, query *mlsv1.QueryWelcomeMessagesRequest) (*mlsv1.QueryWelcomeMessagesResponse, error) } func New(ctx context.Context, config Config) (*Store, error) { @@ -188,107 +189,6 @@ func (s *Store) InsertGroupMessage(ctx context.Context, groupId string, data []b return &message, nil } -func (s *Store) QueryGroupMessages(ctx context.Context, req *mlsv1.QueryGroupMessagesRequest) ([]*GroupMessage, error) { - msgs := make([]*GroupMessage, 0) - - err := s.db.NewSelect(). - Model(&msgs). - Where("group_id = ?", req.GroupId). - Order("created_at DESC"). - Scan(ctx) - if err != nil { - return nil, err - } - - return msgs, nil - - // messages := make([]*GroupMessage, 0) - - // if len(query.ContentTopics) == 0 { - // return nil, errors.New("topic is required") - // } - - // if len(query.ContentTopics) > 1 { - // return nil, errors.New("multiple topics not supported") - // } - - // q := s.db.NewSelect(). - // Model(&messages). - // Where("topic = ?", query.ContentTopics[0]) - - // direction := messagev1.SortDirection_SORT_DIRECTION_DESCENDING - // if query.PagingInfo != nil && query.PagingInfo.Direction != messagev1.SortDirection_SORT_DIRECTION_UNSPECIFIED { - // direction = query.PagingInfo.Direction - // } - // switch direction { - // case messagev1.SortDirection_SORT_DIRECTION_DESCENDING: - // q = q.Order("tid DESC") - // case messagev1.SortDirection_SORT_DIRECTION_ASCENDING: - // q = q.Order("tid ASC") - // } - - // pageSize := maxPageSize - // if query.PagingInfo != nil && query.PagingInfo.Limit > 0 && query.PagingInfo.Limit <= maxPageSize { - // pageSize = int(query.PagingInfo.Limit) - // } - // q = q.Limit(pageSize) - - // if query.PagingInfo != nil && query.PagingInfo.GetCursor() != nil && query.PagingInfo.GetCursor().GetIndex() != nil { - // index := query.PagingInfo.GetCursor().GetIndex() - // if index.SenderTimeNs == 0 || len(index.Digest) == 0 { - // return nil, errors.New("invalid cursor") - // } - // cursorTID := buildMessageTIDFromCursor(index) - // if direction == messagev1.SortDirection_SORT_DIRECTION_ASCENDING { - // q = q.Where("tid > ?", cursorTID) - // } else { - // q = q.Where("tid < ?", cursorTID) - // } - // } else { - // if query.StartTimeNs > 0 { - // q = q.Where("created_at >= ?", query.StartTimeNs) - // } - // if query.EndTimeNs > 0 { - // q = q.Where("created_at <= ?", query.EndTimeNs) - // } - // } - - // err := q.Scan(ctx) - // if err != nil { - // return nil, err - // } - - // envs := make([]*messagev1.Envelope, 0, len(messages)) - // for _, msg := range messages { - // envs = append(envs, &messagev1.Envelope{ - // ContentTopic: msg.Topic, - // TimestampNs: uint64(msg.CreatedAt), - // Message: msg.Content, - // }) - // } - - // pagingInfo := &messagev1.PagingInfo{Limit: 0, Cursor: nil, Direction: direction} - // if len(envs) >= pageSize { - // if len(envs) > 0 { - // lastEnv := envs[len(envs)-1] - // digest := buildMessageContentDigest(lastEnv.Message) - // pagingInfo.Cursor = &messagev1.Cursor{ - // Cursor: &messagev1.Cursor_Index{ - // Index: &messagev1.IndexCursor{ - // Digest: digest[:], - // SenderTimeNs: lastEnv.TimestampNs, - // }, - // }, - // } - // } - // } - - // return &messagev1.QueryResponse{ - // Envelopes: envs, - // PagingInfo: pagingInfo, - // }, nil -} - func (s *Store) InsertWelcomeMessage(ctx context.Context, installationId string, data []byte) (*WelcomeMessage, error) { message := WelcomeMessage{ Data: data, @@ -308,19 +208,141 @@ func (s *Store) InsertWelcomeMessage(ctx context.Context, installationId string, return &message, nil } -func (s *Store) QueryWelcomeMessages(ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest) ([]*WelcomeMessage, error) { +func (s *Store) QueryGroupMessagesV1(ctx context.Context, req *mlsv1.QueryGroupMessagesRequest) (*mlsv1.QueryGroupMessagesResponse, error) { + msgs := make([]*GroupMessage, 0) + + if req.GroupId == "" { + return nil, errors.New("group is required") + } + + q := s.db.NewSelect(). + Model(&msgs). + Where("group_id = ?", req.GroupId) + + direction := mlsv1.SortDirection_SORT_DIRECTION_DESCENDING + if req.PagingInfo != nil && req.PagingInfo.Direction != mlsv1.SortDirection_SORT_DIRECTION_UNSPECIFIED { + direction = req.PagingInfo.Direction + } + switch direction { + case mlsv1.SortDirection_SORT_DIRECTION_DESCENDING: + q = q.Order("id DESC") + case mlsv1.SortDirection_SORT_DIRECTION_ASCENDING: + q = q.Order("id ASC") + } + + pageSize := maxPageSize + if req.PagingInfo != nil && req.PagingInfo.Limit > 0 && req.PagingInfo.Limit <= maxPageSize { + pageSize = int(req.PagingInfo.Limit) + } + q = q.Limit(pageSize) + + if req.PagingInfo != nil && req.PagingInfo.Cursor != 0 { + if direction == mlsv1.SortDirection_SORT_DIRECTION_ASCENDING { + q = q.Where("id > ?", req.PagingInfo.Cursor) + } else { + q = q.Where("id < ?", req.PagingInfo.Cursor) + } + } + + err := q.Scan(ctx) + if err != nil { + return nil, err + } + + messages := make([]*message_contents.GroupMessage, 0, len(msgs)) + for _, msg := range msgs { + messages = append(messages, &message_contents.GroupMessage{ + Version: &message_contents.GroupMessage_V1_{ + V1: &message_contents.GroupMessage_V1{ + Id: msg.Id, + CreatedNs: uint64(msg.CreatedAt.UnixNano()), + GroupId: msg.GroupId, + Data: msg.Data, + }, + }, + }) + } + + pagingInfo := &mlsv1.PagingInfo{Limit: 0, Cursor: 0, Direction: direction} + if len(messages) >= pageSize { + if len(messages) > 0 { + lastMsg := msgs[len(messages)-1] + pagingInfo.Cursor = lastMsg.Id + } + } + + return &mlsv1.QueryGroupMessagesResponse{ + Messages: messages, + PagingInfo: pagingInfo, + }, nil +} + +func (s *Store) QueryWelcomeMessagesV1(ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest) (*mlsv1.QueryWelcomeMessagesResponse, error) { msgs := make([]*WelcomeMessage, 0) - err := s.db.NewSelect(). + if req.InstallationId == "" { + return nil, errors.New("installation is required") + } + + q := s.db.NewSelect(). Model(&msgs). - Where("installation_id = ?", req.InstallationId). - Order("created_at DESC"). - Scan(ctx) + Where("installation_id = ?", req.InstallationId) + + direction := mlsv1.SortDirection_SORT_DIRECTION_DESCENDING + if req.PagingInfo != nil && req.PagingInfo.Direction != mlsv1.SortDirection_SORT_DIRECTION_UNSPECIFIED { + direction = req.PagingInfo.Direction + } + switch direction { + case mlsv1.SortDirection_SORT_DIRECTION_DESCENDING: + q = q.Order("id DESC") + case mlsv1.SortDirection_SORT_DIRECTION_ASCENDING: + q = q.Order("id ASC") + } + + pageSize := maxPageSize + if req.PagingInfo != nil && req.PagingInfo.Limit > 0 && req.PagingInfo.Limit <= maxPageSize { + pageSize = int(req.PagingInfo.Limit) + } + q = q.Limit(pageSize) + + if req.PagingInfo != nil && req.PagingInfo.Cursor != 0 { + if direction == mlsv1.SortDirection_SORT_DIRECTION_ASCENDING { + q = q.Where("id > ?", req.PagingInfo.Cursor) + } else { + q = q.Where("id < ?", req.PagingInfo.Cursor) + } + } + + err := q.Scan(ctx) if err != nil { return nil, err } - return msgs, nil + messages := make([]*message_contents.WelcomeMessage, 0, len(msgs)) + for _, msg := range msgs { + messages = append(messages, &message_contents.WelcomeMessage{ + Version: &message_contents.WelcomeMessage_V1_{ + V1: &message_contents.WelcomeMessage_V1{ + Id: msg.Id, + CreatedNs: uint64(msg.CreatedAt.UnixNano()), + Data: msg.Data, + }, + }, + }) + } + + pagingInfo := &mlsv1.PagingInfo{Limit: 0, Cursor: 0, Direction: direction} + if len(messages) >= pageSize { + if len(messages) > 0 { + lastMsg := msgs[len(messages)-1] + pagingInfo.Cursor = lastMsg.Id + } + } + + return &mlsv1.QueryWelcomeMessagesResponse{ + Messages: messages, + PagingInfo: pagingInfo, + }, nil } func (s *Store) migrate(ctx context.Context) error { diff --git a/pkg/mls/store/store_test.go b/pkg/mls/store/store_test.go index 5444b1c2..00897cd7 100644 --- a/pkg/mls/store/store_test.go +++ b/pkg/mls/store/store_test.go @@ -7,32 +7,17 @@ import ( "time" "github.com/stretchr/testify/require" + mlsv1 "github.com/xmtp/proto/v3/go/mls/api/v1" test "github.com/xmtp/xmtp-node-go/pkg/testing" ) -type testStoreOption func(*Config) - -func withFixedNow(now time.Time) testStoreOption { - return func(c *Config) { - c.now = func() time.Time { - return now - } - } -} - -func NewTestStore(t *testing.T, opts ...testStoreOption) (*Store, func()) { +func NewTestStore(t *testing.T) (*Store, func()) { log := test.NewLog(t) db, _, dbCleanup := test.NewMLSDB(t) ctx := context.Background() c := Config{ Log: log, DB: db, - now: func() time.Time { - return time.Now().UTC() - }, - } - for _, opt := range opts { - opt(&c) } store, err := New(ctx, c) @@ -189,363 +174,393 @@ func TestIdentityUpdateSort(t *testing.T) { require.Equal(t, updates[2].TimestampNs, uint64(3)) } -// TODO(snormore): implement these tests - -// func TestInsertMessage_Single(t *testing.T) { -// now := time.Now().UTC() -// store, cleanup := NewTestStore(t, withFixedNow(now)) -// defer cleanup() - -// ctx := context.Background() -// env, err := store.InsertMessage(ctx, "topic", []byte("content")) -// require.NoError(t, err) -// require.NotNil(t, env) -// require.Equal(t, &messagev1.Envelope{ -// ContentTopic: "topic", -// TimestampNs: uint64(now.UnixNano()), -// Message: []byte("content"), -// }, env) - -// msgs := make([]*Message, 0) -// err = store.db.NewSelect().Model(&msgs).Scan(ctx) -// require.NoError(t, err) -// require.Len(t, msgs, 1) -// require.Equal(t, &Message{ -// Topic: "topic", -// CreatedAt: now.UnixNano(), -// Content: []byte("content"), -// TID: buildMessageTID(now, []byte("content")), -// }, msgs[0]) -// } - -// func TestInsertMessage_Duplicate(t *testing.T) { -// now := time.Now().UTC() -// store, cleanup := NewTestStore(t, withFixedNow(now)) -// defer cleanup() - -// ctx := context.Background() -// _, err := store.InsertMessage(ctx, "topic", []byte("content")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic", []byte("content")) -// require.NoError(t, err) - -// msgs := make([]*Message, 0) -// err = store.db.NewSelect().Model(&msgs).Scan(ctx) -// require.NoError(t, err) -// require.Len(t, msgs, 1) -// require.Equal(t, &Message{ -// Topic: "topic", -// CreatedAt: now.UnixNano(), -// Content: []byte("content"), -// TID: buildMessageTID(now, []byte("content")), -// }, msgs[0]) -// } - -// func TestInsertMessage_ManyAreOrderedByTime(t *testing.T) { -// store, cleanup := NewTestStore(t) -// defer cleanup() - -// ctx := context.Background() -// _, err := store.InsertMessage(ctx, "topic", []byte("content1")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic", []byte("content2")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic", []byte("content3")) -// require.NoError(t, err) - -// msgs := make([]*Message, 0) -// err = store.db.NewSelect().Model(&msgs).Order("tid DESC").Scan(ctx) -// require.NoError(t, err) -// require.Len(t, msgs, 3) -// require.Equal(t, []byte("content3"), msgs[0].Content) -// require.Equal(t, []byte("content2"), msgs[1].Content) -// require.Equal(t, []byte("content1"), msgs[2].Content) -// } - -// func TestQueryMessages_MissingTopic(t *testing.T) { -// store, cleanup := NewTestStore(t) -// defer cleanup() - -// ctx := context.Background() - -// resp, err := store.QueryMessages(ctx, &messagev1.QueryRequest{}) -// require.EqualError(t, err, "topic is required") -// require.Nil(t, resp) -// } - -// func TestQueryMessages_Filter(t *testing.T) { -// store, cleanup := NewTestStore(t) -// defer cleanup() - -// ctx := context.Background() -// _, err := store.InsertMessage(ctx, "topic1", []byte("content1")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic2", []byte("content2")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content3")) -// require.NoError(t, err) - -// resp, err := store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"unknown"}, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 0) - -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content3"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content1"), resp.Envelopes[1].Message) - -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic2"}, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 1) -// require.Equal(t, []byte("content2"), resp.Envelopes[0].Message) - -// // Sort ascending -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// PagingInfo: &messagev1.PagingInfo{ -// Direction: messagev1.SortDirection_SORT_DIRECTION_ASCENDING, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content1"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content3"), resp.Envelopes[1].Message) -// } - -// func TestQueryMessages_Paginate_Cursor(t *testing.T) { -// store, cleanup := NewTestStore(t) -// defer cleanup() - -// ctx := context.Background() -// _, err := store.InsertMessage(ctx, "topic1", []byte("content1")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic2", []byte("content2")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content3")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic2", []byte("content4")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content5")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content6")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content7")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content8")) -// require.NoError(t, err) - -// resp, err := store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 6) -// require.Equal(t, []byte("content8"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content7"), resp.Envelopes[1].Message) -// require.Equal(t, []byte("content6"), resp.Envelopes[2].Message) -// require.Equal(t, []byte("content5"), resp.Envelopes[3].Message) -// require.Equal(t, []byte("content3"), resp.Envelopes[4].Message) -// require.Equal(t, []byte("content1"), resp.Envelopes[5].Message) - -// thirdEnv := resp.Envelopes[2] -// fifthEnv := resp.Envelopes[4] - -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// PagingInfo: &messagev1.PagingInfo{ -// Limit: 2, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content8"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content7"), resp.Envelopes[1].Message) - -// // Order descending by default -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// PagingInfo: &messagev1.PagingInfo{ -// Limit: 2, -// Cursor: &messagev1.Cursor{ -// Cursor: &messagev1.Cursor_Index{ -// Index: &messagev1.IndexCursor{ -// SenderTimeNs: thirdEnv.TimestampNs, -// Digest: buildMessageContentDigest(thirdEnv.Message), -// }, -// }, -// }, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content5"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content3"), resp.Envelopes[1].Message) - -// // Next page from previous response -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// PagingInfo: resp.PagingInfo, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 1) -// require.Equal(t, []byte("content1"), resp.Envelopes[0].Message) - -// // Order ascending -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// PagingInfo: &messagev1.PagingInfo{ -// Limit: 2, -// Direction: messagev1.SortDirection_SORT_DIRECTION_ASCENDING, -// Cursor: &messagev1.Cursor{ -// Cursor: &messagev1.Cursor_Index{ -// Index: &messagev1.IndexCursor{ -// SenderTimeNs: fifthEnv.TimestampNs, -// Digest: buildMessageContentDigest(fifthEnv.Message), -// }, -// }, -// }, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content5"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content6"), resp.Envelopes[1].Message) - -// // Next page from previous response -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// PagingInfo: resp.PagingInfo, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content7"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content8"), resp.Envelopes[1].Message) -// } - -// func TestQueryMessages_Paginate_StartEndTime(t *testing.T) { -// store, cleanup := NewTestStore(t) -// defer cleanup() - -// ctx := context.Background() -// _, err := store.InsertMessage(ctx, "topic1", []byte("content1")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic2", []byte("content2")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content3")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic2", []byte("content4")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content5")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content6")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content7")) -// require.NoError(t, err) -// _, err = store.InsertMessage(ctx, "topic1", []byte("content8")) -// require.NoError(t, err) - -// resp, err := store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 6) -// require.Equal(t, []byte("content8"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content7"), resp.Envelopes[1].Message) -// require.Equal(t, []byte("content6"), resp.Envelopes[2].Message) -// require.Equal(t, []byte("content5"), resp.Envelopes[3].Message) -// require.Equal(t, []byte("content3"), resp.Envelopes[4].Message) -// require.Equal(t, []byte("content1"), resp.Envelopes[5].Message) - -// thirdEnv := resp.Envelopes[2] -// fifthEnv := resp.Envelopes[4] - -// // Order descending by default -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// StartTimeNs: thirdEnv.TimestampNs, -// PagingInfo: &messagev1.PagingInfo{ -// Limit: 2, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content8"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content7"), resp.Envelopes[1].Message) - -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// EndTimeNs: thirdEnv.TimestampNs, -// PagingInfo: &messagev1.PagingInfo{ -// Limit: 2, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content6"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content5"), resp.Envelopes[1].Message) - -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// StartTimeNs: fifthEnv.TimestampNs, -// EndTimeNs: thirdEnv.TimestampNs, -// PagingInfo: &messagev1.PagingInfo{ -// Limit: 4, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 3) -// require.Equal(t, []byte("content6"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content5"), resp.Envelopes[1].Message) -// require.Equal(t, []byte("content3"), resp.Envelopes[2].Message) - -// // Order ascending -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// StartTimeNs: thirdEnv.TimestampNs, -// PagingInfo: &messagev1.PagingInfo{ -// Limit: 2, -// Direction: messagev1.SortDirection_SORT_DIRECTION_ASCENDING, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content6"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content7"), resp.Envelopes[1].Message) - -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// EndTimeNs: thirdEnv.TimestampNs, -// PagingInfo: &messagev1.PagingInfo{ -// Limit: 2, -// Direction: messagev1.SortDirection_SORT_DIRECTION_ASCENDING, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 2) -// require.Equal(t, []byte("content1"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content3"), resp.Envelopes[1].Message) - -// resp, err = store.QueryMessages(ctx, &messagev1.QueryRequest{ -// ContentTopics: []string{"topic1"}, -// StartTimeNs: fifthEnv.TimestampNs, -// EndTimeNs: thirdEnv.TimestampNs, -// PagingInfo: &messagev1.PagingInfo{ -// Limit: 4, -// Direction: messagev1.SortDirection_SORT_DIRECTION_ASCENDING, -// }, -// }) -// require.NoError(t, err) -// require.Len(t, resp.Envelopes, 3) -// require.Equal(t, []byte("content3"), resp.Envelopes[0].Message) -// require.Equal(t, []byte("content5"), resp.Envelopes[1].Message) -// require.Equal(t, []byte("content6"), resp.Envelopes[2].Message) -// } +func TestInsertGroupMessage_Single(t *testing.T) { + started := time.Now().UTC() + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + msg, err := store.InsertGroupMessage(ctx, "installation", []byte("data")) + require.NoError(t, err) + require.NotNil(t, msg) + require.Equal(t, uint64(1), msg.Id) + require.True(t, msg.CreatedAt.Before(time.Now().UTC()) && msg.CreatedAt.After(started)) + require.Equal(t, "installation", msg.GroupId) + require.Equal(t, []byte("data"), msg.Data) + + msgs := make([]*GroupMessage, 0) + err = store.db.NewSelect().Model(&msgs).Scan(ctx) + require.NoError(t, err) + require.Len(t, msgs, 1) + require.Equal(t, msg, msgs[0]) +} + +func TestInsertGroupMessage_ManyOrderedByTime(t *testing.T) { + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + _, err := store.InsertGroupMessage(ctx, "group", []byte("data1")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group", []byte("data2")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group", []byte("data3")) + require.NoError(t, err) + + msgs := make([]*GroupMessage, 0) + err = store.db.NewSelect().Model(&msgs).Order("created_at DESC").Scan(ctx) + require.NoError(t, err) + require.Len(t, msgs, 3) + require.Equal(t, []byte("data3"), msgs[0].Data) + require.Equal(t, []byte("data2"), msgs[1].Data) + require.Equal(t, []byte("data1"), msgs[2].Data) +} + +func TestInsertWelcomeMessage_Single(t *testing.T) { + started := time.Now().UTC() + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + msg, err := store.InsertWelcomeMessage(ctx, "group", []byte("data")) + require.NoError(t, err) + require.NotNil(t, msg) + require.Equal(t, uint64(1), msg.Id) + require.True(t, msg.CreatedAt.Before(time.Now().UTC()) && msg.CreatedAt.After(started)) + require.Equal(t, "group", msg.InstallationId) + require.Equal(t, []byte("data"), msg.Data) + + msgs := make([]*WelcomeMessage, 0) + err = store.db.NewSelect().Model(&msgs).Scan(ctx) + require.NoError(t, err) + require.Len(t, msgs, 1) + require.Equal(t, msg, msgs[0]) +} + +func TestInsertWelcomeMessage_ManyOrderedByTime(t *testing.T) { + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + _, err := store.InsertWelcomeMessage(ctx, "installation", []byte("data1")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation", []byte("data2")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation", []byte("data3")) + require.NoError(t, err) + + msgs := make([]*WelcomeMessage, 0) + err = store.db.NewSelect().Model(&msgs).Order("created_at DESC").Scan(ctx) + require.NoError(t, err) + require.Len(t, msgs, 3) + require.Equal(t, []byte("data3"), msgs[0].Data) + require.Equal(t, []byte("data2"), msgs[1].Data) + require.Equal(t, []byte("data1"), msgs[2].Data) +} + +func TestQueryGroupMessagesV1_MissingGroup(t *testing.T) { + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + + resp, err := store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{}) + require.EqualError(t, err, "group is required") + require.Nil(t, resp) +} + +func TestQueryWelcomeMessagesV1_MissingInstallation(t *testing.T) { + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + + resp, err := store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{}) + require.EqualError(t, err, "installation is required") + require.Nil(t, resp) +} + +func TestQueryGroupMessagesV1_Filter(t *testing.T) { + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + _, err := store.InsertGroupMessage(ctx, "group1", []byte("data1")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group2", []byte("data2")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group1", []byte("data3")) + require.NoError(t, err) + + resp, err := store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "unknown", + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 0) + + resp, err = store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "group1", + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("data3"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("data1"), resp.Messages[1].GetV1().Data) + + resp, err = store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "group2", + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 1) + require.Equal(t, []byte("data2"), resp.Messages[0].GetV1().Data) + + // Sort ascending + resp, err = store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "group1", + PagingInfo: &mlsv1.PagingInfo{ + Direction: mlsv1.SortDirection_SORT_DIRECTION_ASCENDING, + }, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("data1"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("data3"), resp.Messages[1].GetV1().Data) +} + +func TestQueryWelcomeMessagesV1_Filter(t *testing.T) { + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + _, err := store.InsertWelcomeMessage(ctx, "installation1", []byte("data1")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation2", []byte("data2")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation1", []byte("data3")) + require.NoError(t, err) + + resp, err := store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "unknown", + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 0) + + resp, err = store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "installation1", + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("data3"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("data1"), resp.Messages[1].GetV1().Data) + + resp, err = store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "installation2", + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 1) + require.Equal(t, []byte("data2"), resp.Messages[0].GetV1().Data) + + // Sort ascending + resp, err = store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "installation1", + PagingInfo: &mlsv1.PagingInfo{ + Direction: mlsv1.SortDirection_SORT_DIRECTION_ASCENDING, + }, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("data1"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("data3"), resp.Messages[1].GetV1().Data) +} + +func TestQueryGroupMessagesV1_Paginate(t *testing.T) { + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + _, err := store.InsertGroupMessage(ctx, "group1", []byte("content1")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group2", []byte("content2")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group1", []byte("content3")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group2", []byte("content4")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group1", []byte("content5")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group1", []byte("content6")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group1", []byte("content7")) + require.NoError(t, err) + _, err = store.InsertGroupMessage(ctx, "group1", []byte("content8")) + require.NoError(t, err) + + resp, err := store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "group1", + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 6) + require.Equal(t, []byte("content8"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content7"), resp.Messages[1].GetV1().Data) + require.Equal(t, []byte("content6"), resp.Messages[2].GetV1().Data) + require.Equal(t, []byte("content5"), resp.Messages[3].GetV1().Data) + require.Equal(t, []byte("content3"), resp.Messages[4].GetV1().Data) + require.Equal(t, []byte("content1"), resp.Messages[5].GetV1().Data) + + thirdMsg := resp.Messages[2] + fifthMsg := resp.Messages[4] + + resp, err = store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "group1", + PagingInfo: &mlsv1.PagingInfo{ + Limit: 2, + }, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("content8"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content7"), resp.Messages[1].GetV1().Data) + + // Order descending by default + resp, err = store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "group1", + PagingInfo: &mlsv1.PagingInfo{ + Limit: 2, + Cursor: thirdMsg.GetV1().Id, + }, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("content5"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content3"), resp.Messages[1].GetV1().Data) + + // Next page from previous response + resp, err = store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "group1", + PagingInfo: resp.PagingInfo, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 1) + require.Equal(t, []byte("content1"), resp.Messages[0].GetV1().Data) + + // Order ascending + resp, err = store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "group1", + PagingInfo: &mlsv1.PagingInfo{ + Limit: 2, + Direction: mlsv1.SortDirection_SORT_DIRECTION_ASCENDING, + Cursor: fifthMsg.GetV1().Id, + }, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("content5"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content6"), resp.Messages[1].GetV1().Data) + + // Next page from previous response + resp, err = store.QueryGroupMessagesV1(ctx, &mlsv1.QueryGroupMessagesRequest{ + GroupId: "group1", + PagingInfo: resp.PagingInfo, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("content7"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content8"), resp.Messages[1].GetV1().Data) +} + +func TestQueryWelcomeMessagesV1_Paginate(t *testing.T) { + store, cleanup := NewTestStore(t) + defer cleanup() + + ctx := context.Background() + _, err := store.InsertWelcomeMessage(ctx, "installation1", []byte("content1")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation2", []byte("content2")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation1", []byte("content3")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation2", []byte("content4")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation1", []byte("content5")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation1", []byte("content6")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation1", []byte("content7")) + require.NoError(t, err) + _, err = store.InsertWelcomeMessage(ctx, "installation1", []byte("content8")) + require.NoError(t, err) + + resp, err := store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "installation1", + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 6) + require.Equal(t, []byte("content8"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content7"), resp.Messages[1].GetV1().Data) + require.Equal(t, []byte("content6"), resp.Messages[2].GetV1().Data) + require.Equal(t, []byte("content5"), resp.Messages[3].GetV1().Data) + require.Equal(t, []byte("content3"), resp.Messages[4].GetV1().Data) + require.Equal(t, []byte("content1"), resp.Messages[5].GetV1().Data) + + thirdMsg := resp.Messages[2] + fifthMsg := resp.Messages[4] + + resp, err = store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "installation1", + PagingInfo: &mlsv1.PagingInfo{ + Limit: 2, + }, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("content8"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content7"), resp.Messages[1].GetV1().Data) + + // Order descending by default + resp, err = store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "installation1", + PagingInfo: &mlsv1.PagingInfo{ + Limit: 2, + Cursor: thirdMsg.GetV1().Id, + }, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("content5"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content3"), resp.Messages[1].GetV1().Data) + + // Next page from previous response + resp, err = store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "installation1", + PagingInfo: resp.PagingInfo, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 1) + require.Equal(t, []byte("content1"), resp.Messages[0].GetV1().Data) + + // Order ascending + resp, err = store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "installation1", + PagingInfo: &mlsv1.PagingInfo{ + Limit: 2, + Direction: mlsv1.SortDirection_SORT_DIRECTION_ASCENDING, + Cursor: fifthMsg.GetV1().Id, + }, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("content5"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content6"), resp.Messages[1].GetV1().Data) + + // Next page from previous response + resp, err = store.QueryWelcomeMessagesV1(ctx, &mlsv1.QueryWelcomeMessagesRequest{ + InstallationId: "installation1", + PagingInfo: resp.PagingInfo, + }) + require.NoError(t, err) + require.Len(t, resp.Messages, 2) + require.Equal(t, []byte("content7"), resp.Messages[0].GetV1().Data) + require.Equal(t, []byte("content8"), resp.Messages[1].GetV1().Data) +} func nowNs() int64 { return time.Now().UTC().UnixNano()