Skip to content

Commit

Permalink
manage watch stream of attachment in go routine
Browse files Browse the repository at this point in the history
  • Loading branch information
karockai authored and krapie committed Feb 13, 2024
1 parent 840118f commit 25ed0ef
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 55 deletions.
2 changes: 1 addition & 1 deletion server/backend/sync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewSubscription(subscriber *time.ActorID) *Subscription {
return &Subscription{
id: xid.New().String(),
subscriber: subscriber,
events: make(chan DocEvent, 1),
events: make(chan DocEvent, 3),
}
}

Expand Down
90 changes: 36 additions & 54 deletions test/integration/presence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@ func TestPresence(t *testing.T) {
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }()
d2 := document.New(helper.TestDocKey(t))
rch, err = c2.Attach(ctx, d2)

defer func() {
assert.NoError(t, c1.Detach(ctx, d1))
}()
d2 := document.New(helper.TestDocKey(t), 2)
rch2, err := c2.Attach(ctx, d2)

assert.NoError(t, err)
assert.NotNil(t, rch)
defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }()
assert.NotNil(t, rch2)
defer func() {
assert.NoError(t, c2.Detach(ctx, d2))
}()

// 02. Update the root of the document and presence
assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
Expand Down Expand Up @@ -173,20 +179,14 @@ func TestPresence(t *testing.T) {
Presences: wr.Presences,
})
}
if len(responsePairs) == 3 {
if len(responsePairs) == 1 {
return
}
}
}
}()

// 03. Watch the second client's document.
expected = append(expected, watchResponsePair{
Type: client.DocumentWatched,
Presences: map[string]innerpresence.Presence{
c2.ID().String(): {},
},
})
watch2Ctx, cancel2 := context.WithCancel(ctx)
rch2, err := c2.Attach(watch2Ctx, d2)
assert.NoError(t, err)
Expand All @@ -209,12 +209,6 @@ func TestPresence(t *testing.T) {
assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t))))

// 05. Unwatch the second client's document.
expected = append(expected, watchResponsePair{
Type: client.DocumentUnwatched,
Presences: map[string]innerpresence.Presence{
c2.ID().String(): d2.MyPresence(),
},
})
cancel2()

wgEvents.Wait()
Expand Down Expand Up @@ -260,20 +254,14 @@ func TestPresence(t *testing.T) {
})
}

if len(responsePairs) == 3 {
if len(responsePairs) == 1 {
return
}
}
}
}()

// 03. Watch the second client's document.
expected = append(expected, watchResponsePair{
Type: client.DocumentWatched,
Presences: map[string]innerpresence.Presence{
c2.ID().String(): {},
},
})
watch2Ctx, cancel2 := context.WithCancel(ctx)
defer cancel2()
rch2, err := c2.Attach(watch2Ctx, d2)
Expand All @@ -296,12 +284,6 @@ func TestPresence(t *testing.T) {
assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t))))

// 05. Unwatch the second client's document.
expected = append(expected, watchResponsePair{
Type: client.DocumentUnwatched,
Presences: map[string]innerpresence.Presence{
c2.ID().String(): d2.MyPresence(),
},
})
assert.NoError(t, c2.Detach(ctx, d2))
assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t))))
wgEvents.Wait()
Expand Down Expand Up @@ -349,20 +331,14 @@ func TestPresence(t *testing.T) {
})
}

if len(responsePairs) == 3 {
if len(responsePairs) == 1 {
return
}
}
}
}()

// 03. Watch the second client's document.
expected = append(expected, watchResponsePair{
Type: client.DocumentWatched,
Presences: map[string]innerpresence.Presence{
c2.ID().String(): {},
},
})
watch2Ctx, cancel2 := context.WithCancel(ctx)
defer cancel2()
rch2, err := c2.Attach(watch2Ctx, d2)
Expand All @@ -385,12 +361,6 @@ func TestPresence(t *testing.T) {
assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t))))

// 05. Unwatch the second client's document.
expected = append(expected, watchResponsePair{
Type: client.DocumentUnwatched,
Presences: map[string]innerpresence.Presence{
c2.ID().String(): d2.MyPresence(),
},
})
cancel2()

assert.NoError(t, c2.Detach(ctx, d2))
Expand Down Expand Up @@ -443,7 +413,7 @@ func TestPresence(t *testing.T) {
})
}

if len(responsePairs) == 2 {
if len(responsePairs) == 1 {
return
}
}
Expand All @@ -452,12 +422,6 @@ func TestPresence(t *testing.T) {

// 03. The second client attaches a document with the same key as the first client's document
// and another document with a different key.
expected = append(expected, watchResponsePair{
Type: client.DocumentWatched,
Presences: map[string]innerpresence.Presence{
c2.ID().String(): d2.MyPresence(),
},
})
watch2Ctx, cancel2 := context.WithCancel(ctx)
rch2, err := c2.Attach(watch2Ctx, d2)
assert.NoError(t, err)
Expand All @@ -472,13 +436,31 @@ func TestPresence(t *testing.T) {
assert.NoError(t, err)
defer func() { assert.NoError(t, c2.Detach(ctx, d3)) }()

// 04. The second client unwatch the documents attached by itself.
// 04. Update clients presence.
err = d3.Update(func(root *json.Object, p *presence.Presence) error {
p.Set("updated", "true")
return nil
})
assert.NoError(t, err)

err = d2.Update(func(root *json.Object, p *presence.Presence) error {
p.Set("updated", "true")
return nil
})
assert.NoError(t, err)

assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)+"2")))
assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t))))
assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t))))

expected = append(expected, watchResponsePair{
Type: client.DocumentUnwatched,
Type: client.PresenceChanged,
Presences: map[string]innerpresence.Presence{
c2.ID().String(): {},
c2.ID().String(): d2.MyPresence(),
},
})

// 04. The second client unwatch the documents attached by itself.
cancel2()
cancel3()

Expand Down

0 comments on commit 25ed0ef

Please sign in to comment.