Skip to content

Commit

Permalink
Move Client.Watch inside Client.Attach and hide it from external inte…
Browse files Browse the repository at this point in the history
…rface

remove Client.Watch from external interface and move into Client.Attach, so watching a document will be executed continously after attaching continuously (and atomically).
  • Loading branch information
karockai committed Aug 5, 2023
1 parent 7f39f2f commit 7dbefe0
Show file tree
Hide file tree
Showing 18 changed files with 326 additions and 212 deletions.
30 changes: 18 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,19 @@ func (c *Client) Deactivate(ctx context.Context) error {

// Attach attaches the given document to this client. It tells the server that
// this client will synchronize the given document.
func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...AttachOption) error {
// If the context "ctx" is canceled or timed out, returned channel will be closed
// and "WatchResponse" from this closed channel has zero events and nil "Err()".
func (c *Client) Attach(
ctx context.Context,
doc *document.Document,
options ...AttachOption,
) (<-chan WatchResponse, error) {
if c.status != activated {
return ErrClientNotActivated
return nil, ErrClientNotActivated
}

if doc.Status() != document.StatusDetached {
return ErrDocumentNotDetached
return nil, ErrDocumentNotDetached
}

opts := &AttachOptions{}
Expand All @@ -265,12 +271,12 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
p.Initialize(opts.Presence)
return nil
}); err != nil {
return err
return nil, err
}

pbChangePack, err := converter.ToChangePack(doc.CreateChangePack())
if err != nil {
return err
return nil, err
}

res, err := c.client.AttachDocument(
Expand All @@ -281,16 +287,16 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
},
)
if err != nil {
return err
return nil, err
}

pack, err := converter.FromChangePack(res.ChangePack)
if err != nil {
return err
return nil, err
}

if err := doc.ApplyChangePack(pack); err != nil {
return err
return nil, err
}
if c.logger.Core().Enabled(zap.DebugLevel) {
c.logger.Debug(fmt.Sprintf(
Expand All @@ -301,7 +307,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
}

if doc.Status() == document.StatusRemoved {
return nil
return nil, nil
}

doc.SetStatus(document.StatusAttached)
Expand All @@ -310,7 +316,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
docID: types.ID(res.DocumentId),
}

return nil
return c.watch(ctx, doc)
}

// Detach detaches the given document from this client. It tells the
Expand Down Expand Up @@ -394,12 +400,12 @@ func (c *Client) Sync(ctx context.Context, options ...SyncOptions) error {
return nil
}

// Watch subscribes to events on a given documentIDs.
// watch subscribes to events on a given documentIDs.
// If an error occurs before stream initialization, the second response, error,
// is returned. If the context "ctx" is canceled or timed out, returned channel
// is closed, and "WatchResponse" from this closed channel has zero events and
// nil "Err()".
func (c *Client) Watch(
func (c *Client) watch(
ctx context.Context,
doc *document.Document,
) (<-chan WatchResponse, error) {
Expand Down
13 changes: 5 additions & 8 deletions test/bench/grpc_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func BenchmarkRPC(b *testing.B) {
assert.NoError(b, err)

d1 := document.New("doc1")
err = cli.Attach(ctx, d1)
_, err = cli.Attach(ctx, d1)
assert.NoError(b, err)

for i := 0; i < b.N; i++ {
Expand All @@ -203,8 +203,9 @@ func BenchmarkRPC(b *testing.B) {
ctx := context.Background()

d1 := document.New(helper.TestDocKey(b))
err := c1.Attach(ctx, d1)
rch1, err := c1.Attach(ctx, d1)
assert.NoError(b, err)
assert.NotNil(b, rch1)
testKey1 := "testKey1"
err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewText(testKey1)
Expand All @@ -213,7 +214,8 @@ func BenchmarkRPC(b *testing.B) {
assert.NoError(b, err)

d2 := document.New(helper.TestDocKey(b))
err = c2.Attach(ctx, d2)
rch2, err = c2.Attach(ctx, d2)

Check failure on line 217 in test/bench/grpc_bench_test.go

View workflow job for this annotation

GitHub Actions / build (1.19.2)

undefined: rch2
assert.NotNil(b, rch2)

Check failure on line 218 in test/bench/grpc_bench_test.go

View workflow job for this annotation

GitHub Actions / build (1.19.2)

undefined: rch2
assert.NoError(b, err)
testKey2 := "testKey2"
err = d2.Update(func(root *json.Object, p *presence.Presence) error {
Expand All @@ -222,11 +224,6 @@ func BenchmarkRPC(b *testing.B) {
})
assert.NoError(b, err)

rch1, err := c1.Watch(ctx, d1)
assert.NoError(b, err)
rch2, err := c2.Watch(ctx, d2)
assert.NoError(b, err)

done1 := make(chan bool)
done2 := make(chan bool)

Expand Down
14 changes: 9 additions & 5 deletions test/integration/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func TestAdmin(t *testing.T) {
assert.Equal(t, codes.NotFound, status.Convert(err).Code())

// 02. client creates a document then admin removes the document.
assert.NoError(t, cli.Attach(ctx, d1))
rch, err := cli.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
err = adminCli.RemoveDocument(ctx, "default", d1.Key().String(), true)
assert.NoError(t, err)
assert.Equal(t, document.StatusAttached, d1.Status())
Expand All @@ -89,11 +91,11 @@ func TestAdmin(t *testing.T) {

// 01. c1 attaches and watches d1.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch, err := c1.Attach(watchCtx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
wg := sync.WaitGroup{}
wg.Add(1)
rch, err := c1.Watch(watchCtx, d1)
assert.NoError(t, err)
go func() {
defer wg.Done()

Expand Down Expand Up @@ -138,7 +140,9 @@ func TestAdmin(t *testing.T) {
assert.Equal(t, codes.NotFound, status.Convert(err).Code())

// 02. try to remove document that is attached by the client.
assert.NoError(t, cli.Attach(ctx, doc))
rch, err := cli.Attach(ctx, doc)
assert.NoError(t, err)
assert.NotNil(t, rch)
err = adminCli.RemoveDocument(ctx, "default", doc.Key().String(), false)
assert.Equal(t, codes.FailedPrecondition, status.Convert(err).Code())
assert.Equal(t, document.StatusAttached, doc.Status())
Expand Down
7 changes: 3 additions & 4 deletions test/integration/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,18 @@ func TestServer(t *testing.T) {
assert.NoError(t, cli.Activate(ctx))

doc := document.New(helper.TestDocKey(t))
assert.NoError(t, cli.Attach(ctx, doc))

rch, err := cli.Attach(ctx, doc)
wg := sync.WaitGroup{}
wrch, err := cli.Watch(ctx, doc)
assert.NoError(t, err)
assert.NotNil(t, rch)

go func() {
for {
select {
case <-ctx.Done():
assert.Fail(t, "unexpected ctx done")
return
case wr := <-wrch:
case wr := <-rch:
if wr.Err == io.EOF || status.Code(wr.Err) == codes.Canceled {
assert.Len(t, wr.Presences, 0)
wg.Done()
Expand Down
38 changes: 26 additions & 12 deletions test/integration/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ func TestArray(t *testing.T) {
t.Run("causal nested array test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").
Expand All @@ -50,17 +51,19 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

syncClientsThenAssertEqual(t, []clientAndDocPair{{c1, d1}, {c2, d2}})
})

t.Run("concurrent array add/delete simple test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddString("v1", "v2")
Expand All @@ -72,8 +75,9 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("k1").Delete(1)
Expand All @@ -93,8 +97,9 @@ func TestArray(t *testing.T) {
t.Run("concurrent array add/delete test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddString("v1")
Expand All @@ -105,8 +110,9 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("k1").AddString("v2", "v3")
Expand All @@ -127,8 +133,9 @@ func TestArray(t *testing.T) {
t.Run("concurrent array delete test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddString("v1", "v2", "v3")
Expand All @@ -139,8 +146,9 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("k1").Delete(1)
Expand All @@ -166,8 +174,9 @@ func TestArray(t *testing.T) {
t.Run("concurrent array move test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddInteger(0, 1, 2)
Expand All @@ -179,8 +188,9 @@ func TestArray(t *testing.T) {
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
prev := root.GetArray("k1").Get(0)
Expand All @@ -206,15 +216,19 @@ func TestArray(t *testing.T) {
t.Run("concurrent array move with the same position test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddInteger(0, 1, 2)
assert.Equal(t, `{"k1":[0,1,2]}`, root.Marshal())
return nil
}))
assert.NoError(t, c1.Sync(ctx))
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
next := root.GetArray("k1").Get(0)
Expand Down
13 changes: 6 additions & 7 deletions test/integration/auth_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func TestProjectAuthWebhook(t *testing.T) {
defer func() { assert.NoError(t, cli.Deactivate(ctx)) }()

doc := document.New(helper.TestDocKey(t))
assert.NoError(t, cli.Attach(ctx, doc))
rch, err := cli.Attach(ctx, doc)
assert.NoError(t, err)
assert.NotNil(t, rch)

// client without token
cliWithoutToken, err := client.Dial(
Expand Down Expand Up @@ -175,11 +177,8 @@ func TestProjectAuthWebhook(t *testing.T) {
assert.NoError(t, err)

doc := document.New(helper.TestDocKey(t))
err = cli.Attach(ctx, doc)
_, err = cli.Attach(ctx, doc)
assert.Equal(t, codes.Unauthenticated, status.Convert(err).Code())

_, err = cli.Watch(ctx, doc)
assert.Equal(t, client.ErrDocumentNotAttached, err)
})
}

Expand Down Expand Up @@ -225,7 +224,7 @@ func TestAuthWebhook(t *testing.T) {
assert.NoError(t, err)

doc := document.New(helper.TestDocKey(t))
err = cli.Attach(ctx, doc)
_, err = cli.Attach(ctx, doc)
assert.NoError(t, err)
})

Expand Down Expand Up @@ -320,7 +319,7 @@ func TestAuthWebhook(t *testing.T) {
assert.NoError(t, err)

doc := document.New(helper.TestDocKey(t))
err = cli.Attach(ctx, doc)
_, err = cli.Attach(ctx, doc)
assert.NoError(t, err)

// 01. multiple requests to update the document.
Expand Down
13 changes: 9 additions & 4 deletions test/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,14 @@ func TestClient(t *testing.T) {

// 01. c1, c2, c3 attach to the same document.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
_, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
_, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
d3 := document.New(helper.TestDocKey(t))
assert.NoError(t, c3.Attach(ctx, d3))
_, err = c3.Attach(ctx, d3)
assert.NoError(t, err)

// 02. c1, c2 sync with push-pull mode.
assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
Expand Down Expand Up @@ -136,7 +139,9 @@ func TestClient(t *testing.T) {
// 01. cli attach to the same document having counter.
ctx := context.Background()
doc := document.New(helper.TestDocKey(t))
assert.NoError(t, cli.Attach(ctx, doc))
rch, err := cli.Attach(ctx, doc)
assert.NoError(t, err)
assert.NotNil(t, rch)

// 02. cli update the document with creating a counter
// and sync with push-pull mode: CP(1, 1) -> CP(2, 2)
Expand Down
Loading

0 comments on commit 7dbefe0

Please sign in to comment.