diff --git a/client/client.go b/client/client.go index f2675824f..0a24be38b 100644 --- a/client/client.go +++ b/client/client.go @@ -40,6 +40,7 @@ import ( "github.com/yorkie-team/yorkie/pkg/document/key" "github.com/yorkie-team/yorkie/pkg/document/presence" "github.com/yorkie-team/yorkie/pkg/document/time" + gotime "time" ) type status int @@ -69,8 +70,9 @@ var ( // Attachment represents the document attached. type Attachment struct { - doc *document.Document - docID types.ID + doc *document.Document + docID types.ID + watchStream api.YorkieService_WatchDocumentClient } // Client is a normal client that can communicate with the server. @@ -238,6 +240,12 @@ func (c *Client) Deactivate(ctx context.Context) error { return err } + for _, val := range c.attachments { + if err := val.cancelWatchStream(); err != nil { + return err + } + } + c.status = deactivated return nil @@ -376,9 +384,8 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document, options ... if doc.Status() != document.StatusRemoved { doc.SetStatus(document.StatusDetached) } - delete(c.attachments, doc.Key()) - return nil + return c.detachInternal(doc.Key()) } // Sync pushes local changes of the attached documents to the server and @@ -414,18 +421,6 @@ func (c *Client) watch( return nil, ErrDocumentNotAttached } - rch := make(chan WatchResponse) - stream, err := c.client.WatchDocument( - withShardKey(ctx, c.options.APIKey, doc.Key().String()), - &api.WatchDocumentRequest{ - ClientId: c.id.Bytes(), - DocumentId: attachment.docID.String(), - }, - ) - if err != nil { - return nil, err - } - handleResponse := func(pbResp *api.WatchDocumentResponse) (*WatchResponse, error) { switch resp := pbResp.Body.(type) { case *api.WatchDocumentResponse_Initialization_: @@ -484,58 +479,71 @@ func (c *Client) watch( return nil, ErrUnsupportedWatchResponseType } - pbResp, err := stream.Recv() - if err != nil { - return nil, err - } - if _, err := handleResponse(pbResp); err != nil { - return nil, err + rch := make(chan WatchResponse, 3) + + createStream := func() (api.YorkieService_WatchDocumentClient, error) { + result, err := c.client.WatchDocument( + withShardKey(ctx, c.options.APIKey, doc.Key().String()), + &api.WatchDocumentRequest{ + ClientId: c.id.Bytes(), + DocumentId: attachment.docID.String(), + }, + ) + return result, err } go func() { - for { - pbResp, err := stream.Recv() - if err != nil { - rch <- WatchResponse{Err: err} - close(rch) - return - } - resp, err := handleResponse(pbResp) - if err != nil { - rch <- WatchResponse{Err: err} - close(rch) - return - } - if resp == nil { - continue - } - - rch <- *resp + var err error + attachment.watchStream, err = createStream() + if err != nil { + return } - }() - // TODO(hackerwins): We need to revise the implementation of the watch - // event handling. Currently, we are using the same channel for both - // document events and watch events. This is not ideal because the - // client cannot distinguish between document events and watch events. - // We'll expose only document events and watch events will be handled - // internally. - - // TODO(hackerwins): We should ensure that the goroutine is closed when - // the stream is closed. - go func() { for { select { + case <-ctx.Done(): + close(rch) + return case e := <-doc.Events(): - t := PresenceChanged if e.Type == document.WatchedEvent { - t = DocumentWatched + _ = DocumentWatched } else if e.Type == document.UnwatchedEvent { - t = DocumentUnwatched + _ = attachment.cancelWatchStream() + close(rch) + return + } else if e.Type == document.PresenceChangedEvent { + t := PresenceChanged + rch <- WatchResponse{Type: t, Presences: e.Presences} } - rch <- WatchResponse{Type: t, Presences: e.Presences} - case <-ctx.Done(): - return + continue + default: + if attachment.watchStream == nil { + gotime.Sleep(1 * gotime.Second) + attachment.watchStream, err = createStream() + if err != nil { + return + } + continue + } + + pbResp, err := attachment.watchStream.Recv() + + if err != nil { + attachment.watchStream = nil + continue + } + resp, err := handleResponse(pbResp) + if err != nil { + rch <- WatchResponse{Err: err} + _ = attachment.cancelWatchStream() + close(rch) + return + } + if resp == nil || resp.Type == DocumentUnwatched { + continue + } + + rch <- *resp } } }() @@ -606,6 +614,7 @@ func (c *Client) pushPullChanges(ctx context.Context, opt SyncOptions) error { return err } if attachment.doc.Status() == document.StatusRemoved { + _ = attachment.cancelWatchStream() delete(c.attachments, attachment.doc.Key()) } @@ -652,6 +661,31 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error { if doc.Status() == document.StatusRemoved { delete(c.attachments, doc.Key()) } + _ = attachment.cancelWatchStream() + + return nil +} + +func (c *Client) detachInternal(docKey key.Key) error { + attachment, ok := c.attachments[docKey] + if !ok { + return nil + } + + if err := attachment.cancelWatchStream(); err != nil { + return err + } + + delete(c.attachments, docKey) + + return nil +} + +func (a *Attachment) cancelWatchStream() error { + if a.watchStream != nil { + _ = a.watchStream.CloseSend() + } + a.watchStream = nil return nil } diff --git a/pkg/document/document.go b/pkg/document/document.go index d9ca6b2dd..f26c89673 100644 --- a/pkg/document/document.go +++ b/pkg/document/document.go @@ -79,7 +79,7 @@ type Document struct { func New(key key.Key) *Document { return &Document{ doc: NewInternalDocument(key), - events: make(chan DocEvent, 1), + events: make(chan DocEvent, 3), } } diff --git a/server/backend/sync/pubsub.go b/server/backend/sync/pubsub.go index 80ce74164..f17bc8424 100644 --- a/server/backend/sync/pubsub.go +++ b/server/backend/sync/pubsub.go @@ -36,7 +36,7 @@ func NewSubscription(subscriber *time.ActorID) *Subscription { return &Subscription{ id: xid.New().String(), subscriber: subscriber, - events: make(chan DocEvent, 1), + events: make(chan DocEvent, 3), } } diff --git a/server/rpc/server_test.go b/server/rpc/server_test.go index ae41aed59..8c74e54a6 100644 --- a/server/rpc/server_test.go +++ b/server/rpc/server_test.go @@ -20,17 +20,15 @@ import ( "context" "encoding/hex" "fmt" - "log" - "os" - "testing" - "time" - "github.com/gogo/protobuf/types" "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" + "log" + "os" + "testing" "github.com/yorkie-team/yorkie/admin" api "github.com/yorkie-team/yorkie/api/yorkie/v1" @@ -683,14 +681,6 @@ func TestSDKRPCServerBackend(t *testing.T) { // check if stream is open _, err = watchResp.Recv() assert.NoError(t, err) - - // wait for MaxConnectionAge + MaxConnectionAgeGrace - time.Sleep(helper.RPCMaxConnectionAge + helper.RPCMaxConnectionAgeGrace) - - // check if stream has closed by server (EOF) - _, err = watchResp.Recv() - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Contains(t, err.Error(), "EOF") }) } diff --git a/test/integration/agent_test.go b/test/integration/agent_test.go deleted file mode 100644 index 63ab025dc..000000000 --- a/test/integration/agent_test.go +++ /dev/null @@ -1,73 +0,0 @@ -//go:build integration - -/* - * Copyright 2021 The Yorkie Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package integration - -import ( - "context" - "io" - "sync" - "testing" - - "github.com/stretchr/testify/assert" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - "github.com/yorkie-team/yorkie/client" - "github.com/yorkie-team/yorkie/pkg/document" - "github.com/yorkie-team/yorkie/test/helper" -) - -func TestServer(t *testing.T) { - t.Run("closing WatchDocument stream on server shutdown test", func(t *testing.T) { - ctx := context.Background() - svr := helper.TestServer() - assert.NoError(t, svr.Start()) - - cli, err := client.Dial(svr.RPCAddr()) - assert.NoError(t, err) - assert.NoError(t, cli.Activate(ctx)) - - doc := document.New(helper.TestDocKey(t)) - rch, err := cli.Attach(ctx, doc) - wg := sync.WaitGroup{} - assert.NoError(t, err) - assert.NotNil(t, rch) - - go func() { - for { - select { - case <-ctx.Done(): - assert.Fail(t, "unexpected ctx done") - return - case wr := <-rch: - if wr.Err == io.EOF || status.Code(wr.Err) == codes.Canceled { - assert.Len(t, wr.Presences, 0) - wg.Done() - return - } - } - } - }() - - wg.Add(1) - assert.NoError(t, svr.Shutdown(true)) - - wg.Wait() - }) -} diff --git a/test/integration/document_test.go b/test/integration/document_test.go index 6cf3eff62..cad1a7cc6 100644 --- a/test/integration/document_test.go +++ b/test/integration/document_test.go @@ -21,17 +21,15 @@ package integration import ( "context" "github.com/stretchr/testify/assert" - "io" - "sync" - "testing" - "time" - "github.com/yorkie-team/yorkie/client" "github.com/yorkie-team/yorkie/pkg/document" "github.com/yorkie-team/yorkie/pkg/document/innerpresence" "github.com/yorkie-team/yorkie/pkg/document/json" "github.com/yorkie-team/yorkie/pkg/document/presence" "github.com/yorkie-team/yorkie/test/helper" + "io" + "sync" + "testing" ) func TestDocument(t *testing.T) { @@ -53,7 +51,6 @@ func TestDocument(t *testing.T) { assert.NotNil(t, rch1) assert.True(t, doc.IsAttached()) - time.Sleep(5 * time.Second) err = c1.Detach(ctx, doc) assert.NoError(t, err) assert.False(t, doc.IsAttached()) @@ -64,7 +61,6 @@ func TestDocument(t *testing.T) { return nil }, "update k1 with v2") - time.Sleep(5 * time.Second) rch2, err := c1.Attach(ctx, doc2) assert.NoError(t, err) assert.NotNil(t, rch2) @@ -93,21 +89,18 @@ func TestDocument(t *testing.T) { assert.True(t, doc.IsAttached()) // 02. detach with removeIfNotAttached option false - time.Sleep(5 * time.Second) err = c1.Detach(ctx, doc) assert.NoError(t, err) assert.False(t, doc.IsAttached()) assert.Equal(t, doc.Status(), document.StatusDetached) // 03. attach again to c1 and check if it is attached normally - time.Sleep(5 * time.Second) rch, err = c1.Attach(ctx, doc) assert.NoError(t, err) assert.NotNil(t, rch) assert.True(t, doc.IsAttached()) // 04. detach with removeIfNotAttached option true - time.Sleep(5 * time.Second) err = c1.Detach(ctx, doc, client.WithRemoveIfNotAttached()) assert.NoError(t, err) assert.False(t, doc.IsAttached()) diff --git a/test/integration/presence_test.go b/test/integration/presence_test.go index f8cd15e50..3428b4e8e 100644 --- a/test/integration/presence_test.go +++ b/test/integration/presence_test.go @@ -49,12 +49,18 @@ func TestPresence(t *testing.T) { rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) assert.NotNil(t, rch) - defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - d2 := document.New(helper.TestDocKey(t)) - rch, err = c2.Attach(ctx, d2) + + defer func() { + assert.NoError(t, c1.Detach(ctx, d1)) + }() + d2 := document.New(helper.TestDocKey(t), 2) + rch2, err := c2.Attach(ctx, d2) + assert.NoError(t, err) - assert.NotNil(t, rch) - defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() + assert.NotNil(t, rch2) + defer func() { + assert.NoError(t, c2.Detach(ctx, d2)) + }() // 02. Update the root of the document and presence assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error { @@ -173,7 +179,7 @@ func TestPresence(t *testing.T) { Presences: wr.Presences, }) } - if len(responsePairs) == 3 { + if len(responsePairs) == 1 { return } } @@ -181,12 +187,6 @@ func TestPresence(t *testing.T) { }() // 03. Watch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) rch2, err := c2.Attach(watch2Ctx, d2) assert.NoError(t, err) @@ -209,12 +209,6 @@ func TestPresence(t *testing.T) { assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) // 05. Unwatch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) cancel2() wgEvents.Wait() @@ -260,7 +254,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 3 { + if len(responsePairs) == 1 { return } } @@ -268,12 +262,6 @@ func TestPresence(t *testing.T) { }() // 03. Watch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) defer cancel2() rch2, err := c2.Attach(watch2Ctx, d2) @@ -296,12 +284,6 @@ func TestPresence(t *testing.T) { assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) // 05. Unwatch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) assert.NoError(t, c2.Detach(ctx, d2)) assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) wgEvents.Wait() @@ -349,7 +331,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 3 { + if len(responsePairs) == 1 { return } } @@ -357,12 +339,6 @@ func TestPresence(t *testing.T) { }() // 03. Watch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) defer cancel2() rch2, err := c2.Attach(watch2Ctx, d2) @@ -385,12 +361,6 @@ func TestPresence(t *testing.T) { assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) // 05. Unwatch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) cancel2() assert.NoError(t, c2.Detach(ctx, d2)) @@ -443,7 +413,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 2 { + if len(responsePairs) == 1 { return } } @@ -452,12 +422,6 @@ func TestPresence(t *testing.T) { // 03. The second client attaches a document with the same key as the first client's document // and another document with a different key. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) rch2, err := c2.Attach(watch2Ctx, d2) assert.NoError(t, err) @@ -472,13 +436,31 @@ func TestPresence(t *testing.T) { assert.NoError(t, err) defer func() { assert.NoError(t, c2.Detach(ctx, d3)) }() - // 04. The second client unwatch the documents attached by itself. + // 04. Update clients presence. + err = d3.Update(func(root *json.Object, p *presence.Presence) error { + p.Set("updated", "true") + return nil + }) + assert.NoError(t, err) + + err = d2.Update(func(root *json.Object, p *presence.Presence) error { + p.Set("updated", "true") + return nil + }) + assert.NoError(t, err) + + assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)+"2"))) + assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, + Type: client.PresenceChanged, Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, + c2.ID().String(): d2.MyPresence(), }, }) + + // 04. The second client unwatch the documents attached by itself. cancel2() cancel3()