diff --git a/server/backend/sync/pubsub.go b/server/backend/sync/pubsub.go index cf9547f27..8ba605d7d 100644 --- a/server/backend/sync/pubsub.go +++ b/server/backend/sync/pubsub.go @@ -36,7 +36,7 @@ func NewSubscription(subscriber *time.ActorID) *Subscription { return &Subscription{ id: xid.New().String(), subscriber: subscriber, - events: make(chan DocEvent, 1), + events: make(chan DocEvent, 3), } } diff --git a/test/integration/presence_test.go b/test/integration/presence_test.go index f8cd15e50..3428b4e8e 100644 --- a/test/integration/presence_test.go +++ b/test/integration/presence_test.go @@ -49,12 +49,18 @@ func TestPresence(t *testing.T) { rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) assert.NotNil(t, rch) - defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - d2 := document.New(helper.TestDocKey(t)) - rch, err = c2.Attach(ctx, d2) + + defer func() { + assert.NoError(t, c1.Detach(ctx, d1)) + }() + d2 := document.New(helper.TestDocKey(t), 2) + rch2, err := c2.Attach(ctx, d2) + assert.NoError(t, err) - assert.NotNil(t, rch) - defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() + assert.NotNil(t, rch2) + defer func() { + assert.NoError(t, c2.Detach(ctx, d2)) + }() // 02. Update the root of the document and presence assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error { @@ -173,7 +179,7 @@ func TestPresence(t *testing.T) { Presences: wr.Presences, }) } - if len(responsePairs) == 3 { + if len(responsePairs) == 1 { return } } @@ -181,12 +187,6 @@ func TestPresence(t *testing.T) { }() // 03. Watch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) rch2, err := c2.Attach(watch2Ctx, d2) assert.NoError(t, err) @@ -209,12 +209,6 @@ func TestPresence(t *testing.T) { assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) // 05. Unwatch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) cancel2() wgEvents.Wait() @@ -260,7 +254,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 3 { + if len(responsePairs) == 1 { return } } @@ -268,12 +262,6 @@ func TestPresence(t *testing.T) { }() // 03. Watch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) defer cancel2() rch2, err := c2.Attach(watch2Ctx, d2) @@ -296,12 +284,6 @@ func TestPresence(t *testing.T) { assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) // 05. Unwatch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) assert.NoError(t, c2.Detach(ctx, d2)) assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) wgEvents.Wait() @@ -349,7 +331,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 3 { + if len(responsePairs) == 1 { return } } @@ -357,12 +339,6 @@ func TestPresence(t *testing.T) { }() // 03. Watch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) defer cancel2() rch2, err := c2.Attach(watch2Ctx, d2) @@ -385,12 +361,6 @@ func TestPresence(t *testing.T) { assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) // 05. Unwatch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) cancel2() assert.NoError(t, c2.Detach(ctx, d2)) @@ -443,7 +413,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 2 { + if len(responsePairs) == 1 { return } } @@ -452,12 +422,6 @@ func TestPresence(t *testing.T) { // 03. The second client attaches a document with the same key as the first client's document // and another document with a different key. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) rch2, err := c2.Attach(watch2Ctx, d2) assert.NoError(t, err) @@ -472,13 +436,31 @@ func TestPresence(t *testing.T) { assert.NoError(t, err) defer func() { assert.NoError(t, c2.Detach(ctx, d3)) }() - // 04. The second client unwatch the documents attached by itself. + // 04. Update clients presence. + err = d3.Update(func(root *json.Object, p *presence.Presence) error { + p.Set("updated", "true") + return nil + }) + assert.NoError(t, err) + + err = d2.Update(func(root *json.Object, p *presence.Presence) error { + p.Set("updated", "true") + return nil + }) + assert.NoError(t, err) + + assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)+"2"))) + assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, + Type: client.PresenceChanged, Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, + c2.ID().String(): d2.MyPresence(), }, }) + + // 04. The second client unwatch the documents attached by itself. cancel2() cancel3()