Skip to content

Commit

Permalink
Move Client.Watch inside Client.Attach and hide it from external inte…
Browse files Browse the repository at this point in the history
…rface

remove Client.Watch from external interface and move into Client.Attach, so watching a document will be executed continously after attaching continuously (and atomically).
  • Loading branch information
karockai authored and krapie committed Feb 13, 2024
1 parent a8f6bc2 commit 840118f
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 128 deletions.
30 changes: 18 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,19 @@ func (c *Client) Deactivate(ctx context.Context) error {

// Attach attaches the given document to this client. It tells the server that
// this client will synchronize the given document.
func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...AttachOption) error {
// If the context "ctx" is canceled or timed out, returned channel will be closed
// and "WatchResponse" from this closed channel has zero events and nil "Err()".
func (c *Client) Attach(
ctx context.Context,
doc *document.Document,
options ...AttachOption,
) (<-chan WatchResponse, error) {
if c.status != activated {
return ErrClientNotActivated
return nil, ErrClientNotActivated
}

if doc.Status() != document.StatusDetached {
return ErrDocumentNotDetached
return nil, ErrDocumentNotDetached
}

opts := &AttachOptions{}
Expand All @@ -277,12 +283,12 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
p.Initialize(opts.Presence)
return nil
}); err != nil {
return err
return nil, err
}

pbChangePack, err := converter.ToChangePack(doc.CreateChangePack())
if err != nil {
return err
return nil, err
}

res, err := c.client.AttachDocument(
Expand All @@ -293,16 +299,16 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
},
), c.options.APIKey, doc.Key().String()))
if err != nil {
return err
return nil, err
}

pack, err := converter.FromChangePack(res.Msg.ChangePack)
if err != nil {
return err
return nil, err
}

if err := doc.ApplyChangePack(pack); err != nil {
return err
return nil, err
}
if c.logger.Core().Enabled(zap.DebugLevel) {
c.logger.Debug(fmt.Sprintf(
Expand All @@ -313,7 +319,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
}

if doc.Status() == document.StatusRemoved {
return nil
return nil, nil
}

doc.SetStatus(document.StatusAttached)
Expand All @@ -322,7 +328,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
docID: types.ID(res.Msg.DocumentId),
}

return nil
return c.watch(ctx, doc)
}

// Detach detaches the given document from this client. It tells the
Expand Down Expand Up @@ -406,12 +412,12 @@ func (c *Client) Sync(ctx context.Context, options ...SyncOptions) error {
return nil
}

// Watch subscribes to events on a given documentIDs.
// watch subscribes to events on a given documentIDs.
// If an error occurs before stream initialization, the second response, error,
// is returned. If the context "ctx" is canceled or timed out, returned channel
// is closed, and "WatchResponse" from this closed channel has zero events and
// nil "Err()".
func (c *Client) Watch(
func (c *Client) watch(
ctx context.Context,
doc *document.Document,
) (<-chan WatchResponse, error) {
Expand Down
19 changes: 9 additions & 10 deletions test/bench/grpc_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func BenchmarkRPC(b *testing.B) {
assert.NoError(b, err)

d1 := document.New("doc1")
err = cli.Attach(ctx, d1)
_, err = cli.Attach(ctx, d1)
assert.NoError(b, err)

for i := 0; i < b.N; i++ {
Expand All @@ -202,8 +202,9 @@ func BenchmarkRPC(b *testing.B) {
ctx := context.Background()

d1 := document.New(helper.TestDocKey(b))
err := c1.Attach(ctx, d1)
rch1, err := c1.Attach(ctx, d1)
assert.NoError(b, err)
assert.NotNil(b, rch1)
testKey1 := "testKey1"
err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewText(testKey1)
Expand All @@ -212,7 +213,8 @@ func BenchmarkRPC(b *testing.B) {
assert.NoError(b, err)

d2 := document.New(helper.TestDocKey(b))
err = c2.Attach(ctx, d2)
rch2, err := c2.Attach(ctx, d2)
assert.NotNil(b, rch2)
assert.NoError(b, err)
testKey2 := "testKey2"
err = d2.Update(func(root *json.Object, p *presence.Presence) error {
Expand All @@ -221,11 +223,6 @@ func BenchmarkRPC(b *testing.B) {
})
assert.NoError(b, err)

rch1, err := c1.Watch(ctx, d1)
assert.NoError(b, err)
rch2, err := c2.Watch(ctx, d2)
assert.NoError(b, err)

done1 := make(chan bool)
done2 := make(chan bool)

Expand Down Expand Up @@ -286,13 +283,15 @@ func BenchmarkRPC(b *testing.B) {
wg.Add(2)
go func() {
defer wg.Done()
err := c1.Attach(ctx, doc1)
rch1, err := c1.Attach(ctx, doc1)
assert.NoError(b, err)
assert.NotNil(b, rch1)
}()
go func() {
defer wg.Done()
err := c2.Attach(ctx, doc2)
rch2, err := c2.Attach(ctx, doc2)
assert.NoError(b, err)
assert.NotNil(b, rch2)
}()
wg.Wait()
}()
Expand Down
14 changes: 9 additions & 5 deletions test/integration/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func TestAdmin(t *testing.T) {
assert.Equal(t, connect.CodeNotFound, connect.CodeOf(err))

// 02. client creates a document then admin removes the document.
assert.NoError(t, cli.Attach(ctx, d1))
rch, err := cli.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
err = adminCli.RemoveDocument(ctx, "default", d1.Key().String(), true)
assert.NoError(t, err)
assert.Equal(t, document.StatusAttached, d1.Status())
Expand All @@ -88,11 +90,11 @@ func TestAdmin(t *testing.T) {

// 01. c1 attaches and watches d1.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch, err := c1.Attach(watchCtx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
wg := sync.WaitGroup{}
wg.Add(1)
rch, err := c1.Watch(watchCtx, d1)
assert.NoError(t, err)
go func() {
defer wg.Done()

Expand Down Expand Up @@ -137,7 +139,9 @@ func TestAdmin(t *testing.T) {
assert.Equal(t, connect.CodeNotFound, connect.CodeOf(err))

// 02. try to remove document that is attached by the client.
assert.NoError(t, cli.Attach(ctx, doc))
rch, err := cli.Attach(ctx, doc)
assert.NoError(t, err)
assert.NotNil(t, rch)
err = adminCli.RemoveDocument(ctx, "default", doc.Key().String(), false)
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, document.StatusAttached, doc.Status())
Expand Down
38 changes: 26 additions & 12 deletions test/integration/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ func TestArray(t *testing.T) {
t.Run("causal nested array test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").
Expand All @@ -53,17 +54,19 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

syncClientsThenAssertEqual(t, []clientAndDocPair{{c1, d1}, {c2, d2}})
})

t.Run("concurrent array add/delete simple test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddString("v1", "v2")
Expand All @@ -75,8 +78,9 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("k1").Delete(1)
Expand All @@ -96,8 +100,9 @@ func TestArray(t *testing.T) {
t.Run("concurrent array add/delete test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddString("v1")
Expand All @@ -108,8 +113,9 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("k1").AddString("v2", "v3")
Expand All @@ -130,8 +136,9 @@ func TestArray(t *testing.T) {
t.Run("concurrent array delete test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddString("v1", "v2", "v3")
Expand All @@ -142,8 +149,9 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("k1").Delete(1)
Expand All @@ -169,8 +177,9 @@ func TestArray(t *testing.T) {
t.Run("concurrent array move test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddInteger(0, 1, 2)
Expand All @@ -182,8 +191,9 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
prev := root.GetArray("k1").Get(0)
Expand All @@ -209,15 +219,19 @@ func TestArray(t *testing.T) {
t.Run("concurrent array move with the same position test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddInteger(0, 1, 2)
assert.Equal(t, `{"k1":[0,1,2]}`, root.Marshal())
return nil
}))
assert.NoError(t, c1.Sync(ctx))
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
next := root.GetArray("k1").Get(0)
Expand Down
13 changes: 9 additions & 4 deletions test/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,14 @@ func TestClient(t *testing.T) {

// 01. c1, c2, c3 attach to the same document.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
_, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
_, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
d3 := document.New(helper.TestDocKey(t))
assert.NoError(t, c3.Attach(ctx, d3))
_, err = c3.Attach(ctx, d3)
assert.NoError(t, err)

// 02. c1, c2 sync with push-pull mode.
assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
Expand Down Expand Up @@ -136,7 +139,9 @@ func TestClient(t *testing.T) {
// 01. cli attach to the same document having counter.
ctx := context.Background()
doc := document.New(helper.TestDocKey(t))
assert.NoError(t, cli.Attach(ctx, doc))
rch, err := cli.Attach(ctx, doc)
assert.NoError(t, err)
assert.NotNil(t, rch)

// 02. cli update the document with creating a counter
// and sync with push-pull mode: CP(1, 1) -> CP(2, 2)
Expand Down
12 changes: 8 additions & 4 deletions test/integration/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func TestCounter(t *testing.T) {
t.Run("causal counter.increase test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewCounter("age", crdt.LongCnt, 1).
Expand All @@ -55,17 +56,19 @@ func TestCounter(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

syncClientsThenAssertEqual(t, []clientAndDocPair{{c1, d1}, {c2, d2}})
})

t.Run("concurrent counter increase test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewCounter("age", crdt.IntegerCnt, 0)
Expand All @@ -78,8 +81,9 @@ func TestCounter(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetCounter("age").
Expand Down
Loading

0 comments on commit 840118f

Please sign in to comment.