Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More history tests #426

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions internal/service/history/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package history

import (
"crypto/rand"
"encoding/json"
"testing"

"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/network/mqtt"
"github.com/emitter-io/emitter/internal/provider/storage"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/service/fake"
Expand Down Expand Up @@ -82,3 +84,115 @@ func TestHistory(t *testing.T) {
// The response should have returned the last 2 messages.
assert.Equal(t, 2, len(response.(*Response).Messages))
}

func TestLargeMessage(t *testing.T) {
ssid := message.Ssid{1, 3238259379, 500706888, 1027807523}
store := storage.NewInMemory(nil)
store.Configure(nil)
auth := &fake.Authorizer{
Success: true,
Contract: uint32(1),
ExtraPerm: security.AllowLoad,
}
// Create new service
service := New(auth, store)
connection := &fake.Conn{}

// The most basic request, on an empty store.
request := &Request{
Key: "key",
Channel: "key/a/b/c/",
}

// Store 1 long message
// Keep in mind the message will be composed of the ID and the channel size on top of the payload.
// So mqttMaxMessageSize is really smaller than the actual message size.
randomBytes := make([]byte, mqtt.MaxMessageSize)
rand.Read(randomBytes)
firstSSID := message.NewID(ssid)
store.Store(&message.Message{
ID: firstSSID,
Channel: []byte("a/b/c/"),
Payload: randomBytes,
TTL: 30,
})

reqBytes, _ := json.Marshal(request)

// Issue the same request
response, ok := service.OnRequest(connection, reqBytes)
// The request should have succeeded and returned a response.
assert.Equal(t, true, ok)
// The response should have returned the last message as per MQTT spec.
assert.Equal(t, 0, len(response.(*Response).Messages))
}

// ONLY PASSES BECAUSE OF THE BUG, THERE IS ONLY ONE SERVER SO NO GATHER
// match.Limit(limit) only limits based on the number of messages not the size of the frame
/*func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {

// Construct a query and lookup locally first
query := newLookupQuery(ssid, from, until, startFromID, limit)
match := s.lookup(query)

// Issue the message survey to the cluster
if req, err := binary.Marshal(query); err == nil && s.survey != nil {
if awaiter, err := s.survey.Query("ssdstore", req); err == nil {

// Wait for all presence updates to come back (or a deadline)
for _, resp := range awaiter.Gather(2000 * time.Millisecond) {
if frame, err := message.DecodeFrame(resp); err == nil {
match = append(match, frame...)
}
}
}
}

match.Limit(limit)
return match, nil
}*/
func TestSumOfTwoExceedMaxSize(t *testing.T) {
ssid := message.Ssid{1, 3238259379, 500706888, 1027807523}
store := storage.NewInMemory(nil)
store.Configure(nil)
auth := &fake.Authorizer{
Success: true,
Contract: uint32(1),
ExtraPerm: security.AllowLoad,
}
// Create new service
service := New(auth, store)
connection := &fake.Conn{}

// The most basic request, on an empty store.
request := &Request{
Key: "key",
Channel: "key/a/b/c/",
}

// Store 2 messages
randomBytes := make([]byte, int(mqtt.MaxMessageSize/2))
rand.Read(randomBytes)
firstSSID := message.NewID(ssid)
store.Store(&message.Message{
ID: firstSSID,
Channel: []byte("a/b/c/"),
Payload: randomBytes,
TTL: 30,
})
store.Store(&message.Message{
ID: message.NewID(ssid),
Channel: []byte("a/b/c/"),
Payload: randomBytes,
TTL: 30,
})
reqBytes, _ := json.Marshal(request)

request.Channel = "key/a/b/c/?last=2"
reqBytes, _ = json.Marshal(request)
response, ok := service.OnRequest(connection, reqBytes)
// The request should have succeeded and returned a response.
assert.Equal(t, true, ok)
// The response should have returned the last 2 messages.
assert.Equal(t, 1, len(response.(*Response).Messages))
}
Loading