diff --git a/design/media/presence-api.png b/design/media/presence-api.png new file mode 100644 index 000000000..8c04934d8 Binary files /dev/null and b/design/media/presence-api.png differ diff --git a/design/media/presence-event-presence-changed.png b/design/media/presence-event-presence-changed.png new file mode 100644 index 000000000..19aa371e5 Binary files /dev/null and b/design/media/presence-event-presence-changed.png differ diff --git a/design/media/presence-event-unwatched.png b/design/media/presence-event-unwatched.png new file mode 100644 index 000000000..d5710ccce Binary files /dev/null and b/design/media/presence-event-unwatched.png differ diff --git a/design/media/presence-event-watched.png b/design/media/presence-event-watched.png new file mode 100644 index 000000000..5e467d61d Binary files /dev/null and b/design/media/presence-event-watched.png differ diff --git a/design/media/presence-structure.png b/design/media/presence-structure.png new file mode 100644 index 000000000..6707de174 Binary files /dev/null and b/design/media/presence-structure.png differ diff --git a/design/media/pubsub.jpg b/design/media/pubsub.jpg new file mode 100644 index 000000000..81daa7ca0 Binary files /dev/null and b/design/media/pubsub.jpg differ diff --git a/design/media/pubsub.png b/design/media/pubsub.png deleted file mode 100644 index 7bb03f378..000000000 Binary files a/design/media/pubsub.png and /dev/null differ diff --git a/design/media/server-streaming.jpg b/design/media/server-streaming.jpg new file mode 100644 index 000000000..aeaf3254d Binary files /dev/null and b/design/media/server-streaming.jpg differ diff --git a/design/media/watch-document.jpg b/design/media/watch-document.jpg new file mode 100644 index 000000000..d6f861bbe Binary files /dev/null and b/design/media/watch-document.jpg differ diff --git a/design/peer-awareness.md b/design/peer-awareness.md deleted file mode 100644 index c2a5f806c..000000000 --- a/design/peer-awareness.md +++ /dev/null @@ -1,99 +0,0 @@ ---- -title: peer-awareness -target-version: 0.1.2 ---- - -# Peer Awareness - -## Summary - -We will provide Peer Awareness which is a simple algorithm that manages end-user -status like who is connected and metadata like username or email address and -etc. For example, users can implement a list of people participating in the -editing, such as a box in the top right of Google Docs. - -### Goals - -Implement Peer Awareness and provide API to users to use the feature. The goal -of the first version is to implement simple functionality and check usability. - -### Non-Goals - -The first version does not implement complex features such as dynamic metadata -updates. - -## Proposal details - -### How to use - -Users can pass metadata along with client options when creating a client. - -```typescript -const client = yorkie.createClient('https://yorkie.dev/api', { - metadata: { - username: 'hackerwins' - } -}); -``` - -Then the users create a document in the usual way and attach it to the client. - -```typescript -const doc = yorkie.createDocument('examples', 'codemirror'); -await client.attach(doc); -``` - -When a new peer registers or leaves, `peers-changed` event is fired, and the -other peer's clientID and metadata can be obtained from the event. - -```typescript -client.subscribe((event) => { - if (event.name === 'peers-changed') { - const peers = event.value[doc.getKey().toIDString()]; - for (const [clientID, metadata] of Object.entries(peers)) { - console.log(clientID, metadata); - } - } -}); -``` - -### How does it work? - -``` - +--Client "A"----+ +--Agent-----------------+ - | +--Metadata--+ | | +--PubSub -----------+ | - | | { num: 1 } | <-- WatchDocuments --> | | { | | - | +------------+ | | | docA: { | | - +----------------+ | | A: { num: 1 }, | | - | | B: { num: 2 } | | - +--Client "B"----+ | | }, | | - | +--Metadata--+ | | | ... | | - | | { num: 2 } | <-- WatchDocuments --> | | } | | - | +------------+ | | +--------------------+ | - +----------------+ +------------------------+ -``` - -When a client attaches documents, a stream is connected between agent and the -client through WatchDocuments API. This will update the map of clients that are -watching the documents in PubSub. When the stream disconnects or a new -connection is made, `DOCUMENTS_UNWATCHED` or `DOCUMENTS_WATCHED` event is -delivered to other clients who are watching the document together. - -### Risks and Mitigation - -The first version is missing the ability to dynamically update metadata and -propagate it to other peers. Client Metadata is managed inside the instance of -the Client and is not stored persistently in Yorkie. The reasons are as follows: - -- The goal of the first version is to check the usability of the feature. -- Metadata's primary "source of truth" location is user's DB, and it is simply - passed to Yorkie. -- All other locations of the metadata in Yorkie just refer back to the primary " - source of truth" location. -- We can prevent increasing management points caused by storing metadata in - MongoDB. - -In the future, if the users needs arise, we may need to implement the ability to -dynamically update metadata and propagates it to peers. We might consider -treating it as a Yorkie Document that has logical clocks, not a normal map in -PubSub. diff --git a/design/presence.md b/design/presence.md new file mode 100644 index 000000000..b82b5baaf --- /dev/null +++ b/design/presence.md @@ -0,0 +1,331 @@ +--- +title: presence +target-version: 0.4.6 +--- + +# Presence + +## Summary + +In collaborative applications, it is important to inform users about who is currently participating in the document and what their status is. To represent this peer awareness, yorkie provides the `Presence` feature. Presence can be used to share state between clients, such as remote cursors, selections, etc. Previously, there was an issue where presence is belong to `Client`, preventing batch processing with `Document`. To address this, presence has been moved from `Client` to `Document`, allowing for batch processing. + +### Goals + +The goal is to move `Presence` from `Client` to `Document` and enable batch processing with both presence and document. + +### Non-Goals + +Distinguishing between users who are not participating in document editing and users in an offline state is not covered. Users can only be aware of online peers. + +## Proposal details + +### Structure Comparison + +![Presence Structure Comparison](media/presence-structure.png) + +Previously, presence is belong to `Client`, leading to the following issues: + +1. Inefficient management of presences for each document when a client participates in more than one document. +2. Inability to edit presence and document together (no batch processing). + +In the new structure, presence is belong to `Document`, and it can be updated together with document updates through `document.update()`, enabling atomic batch processing. + +### How to use + +#### Set presence + +When attaching, the client informs the server that it is subscribing to the document. The `initialPresence` option sets the initial presence of the client. If not set, it is created as an empty object. The presence is shared with other users participating in the document. It must be serializable to JSON. + +```typescript +const doc = new yorkie.Document('docKey'); +await client.attach(doc, { + initialPresence: { color: 'blue', cursor: { x: 0, y: 0 } }, +}); +``` + +#### Get presence + +Only retrieve the presence of online clients. + +```typescript +// Get the presence of the current user +doc.getMyPresence(); // { color: 'blue', cursor: { x: 1, y: 1 } } + +// Get the presence of the client +doc.getPresence(clientID); + +// Get all users currently participating in the document +const users = doc.getPresences(); +for (const { clientID, presence } of users) { + // Do something... +} +``` + +#### Update presence + +Changes specific properties provided to `presence.set()` within `document.update()`. The existing presence object is updated by merging the new changes. In other words, properties not specified will remain unchanged. + +Change within the `document.update()` function are processed atomically, and other clients receive a single change. Subscribers are called only after the change is applied. + +```typescript +doc.update((root, presence) => { + presence.set({ cursor: { x: 1, y: 1 } }); +}); +``` + +#### Subscribe presence events + +`doc.subscribe("presence")` allows subscribing to all presence-related changes. To distinguish between events from oneself and others, `"my-presence"` and `"others"` topics can be used. Subscribing to these events notifies clients when others watch, unwatch, or modify their presence. + +When a new client establishes a watch stream connection with a document, it immediately receives an `initialized` event representing all users participating in the document. (Note: Create the document, subscribe, and then attach to receive the initialized event. You won't receive the initialized event if you subscribe after attaching.) + +```typescript +const unsubscribe = doc.subscribe('presence', (event) => { + if (event.type === 'initialized') { + // Array of users currently participating in the document + } + + if (event.type === 'watched') { + // A user has joined the document editing in online + } + + if (event.type === 'unwatched') { + // A user has left the document editing + } + + if (event.type === 'presence-changed') { + // A user has updated their presence + } +}); + +const unsubscribe = doc.subscribe('my-presence', (event) => { + // evnet.type can be initialized, presence-changed +}); + +const unsubscribe = doc.subscribe('others', (event) => { + // evnet.type can be watched, unwatched, presence-changed +}); +``` + +### How does it work? + +#### Presence Change + +In yorkie, changes to the document are conveyed through a `Change` in the `ChangePack`. To apply changes in the document along with updates to presence, a `PresenceChange` has been included within the `Change`. + +```proto +// resources.proto +message Change { + ChangeID id = 1; + string message = 2; + repeated Operation operations = 3; + PresenceChange presence_change = 4; // this is added +} + +message PresenceChange { + enum ChangeType { + CHANGE_TYPE_UNSPECIFIED = 0; + CHANGE_TYPE_PUT = 1; + CHANGE_TYPE_DELETE = 2; + CHANGE_TYPE_CLEAR = 3; + } + ChangeType type = 1; + Presence presence = 2; +} + +message Presence { + map data = 1; +} +``` + +![Presence-Change within Change](media/presence-api.png) + +When attaching a document, the initial presence is transmitted through `PresenceChange({type: 'put', presence: P})`. When updating presence via `document.update()`, the modified presence is transmitted through `PresenceChange({type: 'put', presence: P})`. When detaching the document, `PresenceChange({type: 'clear'})` is sent to remove the presence. (`P` represents the presence object.) + +#### Presence Events + +##### `presence-changed` event + +![presence-changed event](media/presence-event-presence-changed.png) + +When clientA updates presence using `doc.update()`, clientB receives a `presence-changed` event. Modification within the update function are atomic and sent as a single change to other clients, including operations(`Ops`) and presenceChange(`P`). ClientB applies the change and triggers the subscribers only after applying change. If both root changes and presence changes exist, a `remote-change` event occurs, and then the `presence-changed` event occurs. + +```typescript +public applyChanges(changes: Array>): void { + // ... + for (const change of changes) { + let changeInfo: ChangeInfo | undefined; + let presenceEvent: WatchedEvent

| UnwatchedEvent

| PresenceChangedEvent

| undefined; + // Apply a change + const opInfos = change.execute(this.root, this.presences); + + // Fire DocEvent only after the change is applied. + if (changeInfo) { + this.publish({ + type: DocEventType.RemoteChange, + value: changeInfo, + }); + } + if (presenceEvent) { + this.publish(presenceEvent); + } + } +} +``` + +##### `watched` event + +![watched event](media/presence-event-watched.png) + +When a client attaches to a document, it establishes watch stream connection with the server through the `WatchDocument` API. When the stream disconnects or a new connection is established, `DocumentUnWatched` or `DocumentWatched` event is sent to other clients who are watching the document. (The dotted line represents the watch stream. For information on how the watch stream operates, please refer to the [pub-sub](./pub-sub.md) documentation.) + +The client maintains a `presences` map for client presence and an `onlineClients` set for online (watching) clients. The server manages who is watching a document through `subscriptionsMapByDocID`. + +When clientA attaches while clientB is already watching the document, two scenarios exist depending on the order of receiving `DocumentChanged` and `DocumentWatched` in the watch stream response: + +1. `DocumentChanged(pushpull)` -> `DocumentWatched` + ClientA attaches, and an initial presence change is sent. Since changes occur in the document, the server informs clientB with `DocumentChanged`. ClientB sends a `pushpullChanges` request, receives clientA's initial presence, and adds it to the `presences` map. + Once clientA receives the AttachDocument response, it requests a `WatchDocument` API, establishing a watch stream and receiving the list of current document participants(`initalized`). The server notifies clientB that clientA is watching. ClientB adds clientA to the `onlineClients` set and triggers the `watched` event. + +2. `DocumentWatched` -> `DocumentChanged(pushpull)` + If `DocumentWatched` arrives before pushpull response, clientB can't trigger the `watched` event immediately due to the absence of presence data. Later, when `pushpull` delivers clientA's presence, if the presence didn't exist previously, it means that the initial presence was received. In this case, clientB triggers the `watched` event. + +```typescript +// client.ts +private handleWatchDocumentsResponse( + attachment: Attachment, + resp: WatchDocumentResponse, +) { + const pbWatchEvent = resp.getEvent()!; + const eventType = pbWatchEvent.getType(); + const publisher = converter.toHexString(pbWatchEvent.getPublisher_asU8()); + switch (eventType) { + // ... + case PbDocEventType.DOC_EVENT_TYPE_DOCUMENTS_WATCHED: + attachment.doc.addOnlineClient(publisher); + // NOTE(chacha912): We added to onlineClients, but we won't trigger watched event + // unless we also know their initial presence data at this point. + if (attachment.doc.hasPresence(publisher)) { + attachment.doc.publish({ + type: DocEventType.Watched, + value: { + clientID: publisher, + presence: attachment.doc.getPresence(publisher)!, + }, + }); + } + break; + } +} + + +// document.ts +public applyChanges(changes: Array>): void { + // ... + for (const change of changes) { + let changeInfo: ChangeInfo | undefined; + let presenceEvent: WatchedEvent

| UnwatchedEvent

| PresenceChangedEvent

| undefined; + const actorID = change.getID().getActorID()!; + + if (change.hasPresenceChange() && this.onlineClients.has(actorID)) { + const presenceChange = change.getPresenceChange()!; + switch (presenceChange.type) { + // ... + case PresenceChangeType.Put: + // NOTE(chacha912): When the user exists in onlineClients, but + // their presence was initially absent, we can consider that we have + // received their initial presence, so trigger the 'watched' event. + presenceEvent = { + type: this.presences.has(actorID) + ? DocEventType.PresenceChanged + : DocEventType.Watched, + value: { + clientID: actorID, + presence: presenceChange.presence, + }, + }; + break; + } + } + } +} +``` + +##### `unwatched` event + +![unwatched event](media/presence-event-unwatched.png) + +Consider clientA detaching from the document. Similar to the `watched` scenario, two cases exist for the `unwatched` event: + +1. `DocumentChanged(pushpull)` -> `DocumentUnWatched` + ClientA detaches, sending `PresenceChange({type: 'clear'})`. Since the change occurs, the server informs clientB with `DocumentChanged`. ClientB requests a `pushpullChanges`, receives clientA's `PresenceChange({type: 'clear'})`, and removes clientA from the `presences` map. If clientA exists in `onlineClients`, we can consider it as detachment occurring before unwatching. Since detached user is no longer participating in the document, clientB removes clientA from `onlineClients` and triggers the `unwatched` event. Future `unwatched` event is ignored. +2. `DocumentUnWatched` -> `DocumentChanged(pushpull)` + If clientB receives `DocumentUnWatched` before pushpull response, clientB removes clientA from `onlineClients` and triggers the `unwatched` event. Later, `pushpull` removes clientA's presence from the `presences` map. + +```typescript +// client.ts +private handleWatchDocumentsResponse( + attachment: Attachment, + resp: WatchDocumentResponse, +) { + const pbWatchEvent = resp.getEvent()!; + const eventType = pbWatchEvent.getType(); + const publisher = converter.toHexString(pbWatchEvent.getPublisher_asU8()); + switch (eventType) { + // ... + case PbDocEventType.DOC_EVENT_TYPE_DOCUMENTS_UNWATCHED: { + const presence = attachment.doc.getPresence(publisher); + attachment.doc.removeOnlineClient(publisher); + // NOTE(chacha912): There is no presence, when PresenceChange(clear) is applied before unwatching. + // In that case, the 'unwatched' event is triggered while handling the PresenceChange. + if (presence) { + attachment.doc.publish({ + type: DocEventType.Unwatched, + value: { clientID: publisher, presence }, + }); + } + break; + } +} + + +// document.ts +public applyChanges(changes: Array>): void { + // ... + for (const change of changes) { + let changeInfo: ChangeInfo | undefined; + let presenceEvent: WatchedEvent

| UnwatchedEvent

| PresenceChangedEvent

| undefined; + const actorID = change.getID().getActorID()!; + + if (change.hasPresenceChange() && this.onlineClients.has(actorID)) { + const presenceChange = change.getPresenceChange()!; + switch (presenceChange.type) { + // ... + case PresenceChangeType.Clear: + // NOTE(chacha912): When the user exists in onlineClients, but + // PresenceChange(clear) is received, we can consider it as detachment + // occurring before unwatching. + // Detached user is no longer participating in the document, we remove + // them from the online clients and trigger the 'unwatched' event. + presenceEvent = { + type: DocEventType.Unwatched, + value: { + clientID: actorID, + presence: this.getPresence(actorID)!, + }, + }; + this.removeOnlineClient(actorID); + break; + } + } + } +} +``` + +### Risks and Mitigation + +- If clients don't explicitly detach, presence can accumulate in the `presences` map. We can handle clearing presence during housekeeping. +- Although presence doesn't need to be stored in the database, it is included in `Change` and stored. Later, we may consider changing the storage approach as data grows. ([Refer to Discord's storage changes.](https://blog.bytebytego.com/i/109085468/how-discord-stores-trillions-of-messages)) + - Client, Document → MongoDB (Document DB or RDB) + - Change → HBase (Wide column store) + - Snapshot, Presence → Redis (key-value store, In-memory DB) diff --git a/design/pub-sub.md b/design/pub-sub.md index 25658fcac..52e336275 100644 --- a/design/pub-sub.md +++ b/design/pub-sub.md @@ -1,6 +1,6 @@ --- title: pub-sub -target-version: 0.3.0 +target-version: 0.4.6 --- # PubSub @@ -18,36 +18,46 @@ Documents. ## Proposal Details -### How does it work? +### WatchDocument API + +In Yorkie, we use gRPC-Web and it currently supports 2 RPC modes: unary RPCs, server-side streaming RPCs. ([Client-side and Bi-directional streaming is not supported.](https://github.com/grpc/grpc-web#streaming-support)). Server-side streaming allows the server to send multiple messages in response to a single client request. -Yorkie implements WatchDocuments API -using [gRPC server-side streaming](https://grpc.io/docs/languages/go/basics/#server-side-streaming-rpc) -to deliver the events that have occurred to other clients. +![server-side streaming RPCs](media/server-streaming.jpg) + +Yorkie implements WatchDocument API using [gRPC server-side streaming](https://grpc.io/docs/languages/go/basics/#server-side-streaming-rpc) to deliver the events to other clients. ```protobuf // api/yorkie.proto service Yorkie { ... - rpc WatchDocuments (WatchDocumentsRequest) returns (stream WatchDocumentsResponse) {} + rpc WatchDocument (WatchDocumentRequest) returns (stream WatchDocumentResponse) {} } ``` -And to manage the event delivery target, we are using the [PubSub pattern](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern). You can learn more by looking at the [sync package](https://github.com/yorkie-team/yorkie/blob/main/server/backend/sync/pubsub.go) we are implementing. +In brief, when the client sends a WatchDocument request, it establishes a stream connection(1). On the server side, when changes occur in a document, it retrieves the clients watching that document in the subscriptionsMap(2), and then sends responses through the server stream to those clients(3). As a result, clients watching the document can receive response about its changes through the stream without the separate requests. + +![WatchDocument API](media/watch-document.jpg) + +### How does it work? + +We are using the [PubSub pattern](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) for handling event delivery targets. For more details, you can check out the [sync package](https://github.com/yorkie-team/yorkie/blob/main/server/backend/sync/pubsub.go) that we're working on. + ![pub-sub pattern vs observer pattern](media/pubsub-pattern.png) The process of the event pub-sub is as follows: -![pub-sub event flow](media/pubsub.png) + +![pub-sub event flow](media/pubsub.jpg) #### 1. Set SubscriptionsMap -The `watchDocuments` API creates a `Subscription` instance and adds it to the `subscriptionsMapByDocKey`. The `Subscription` instance internally manages the `DocEvent channel`, and a `select` statement is used to retrieve events that are passed to the `Subscription` instance. +The `WatchDocument` API creates a `Subscription` instance and adds it to the `subscriptionsMapByDocID`[(code🔍)](https://github.com/yorkie-team/yorkie/blob/16fd182021231d75562a933cb32d924af16fc7f4/server/rpc/yorkie_server.go#L518-L523). The `Subscription` instance internally manages the `DocEvent channel`. ```go // Subscription represents a subscription of a subscriber to documents. type Subscription struct { id string - subscriber types.Client + subscriber *time.ActorID closed bool events chan DocEvent } @@ -55,23 +65,41 @@ type Subscription struct { #### 2. Publish Event -The publisher can send a `DocEvent` to the `Subscription` instances that subscribe to the same document through the `Publish` method. +When changes occur in a document through WatchDocument or PushPull, the `Publish` method is called to send out a `DocEvent` which includes events such as `DocumentsWatchedEvent`, `DocumentsUnwatchedEvent`, and `DocumentsChangedEvent`. ```go -type DocEvent struct { - Type types.DocEventType - Publisher types.Client - DocumentKeys []key.Key +// server/rpc/yorkie_server.go +func (s *yorkieServer) watchDoc(...) (...) { + // Publish DocumentsWatchedEvent during watchDocument + s.backend.Coordinator.Publish( + ctx, + subscription.Subscriber(), + sync.DocEvent{ + Type: types.DocumentsWatchedEvent, + Publisher: subscription.Subscriber(), + DocumentID: documentID, + }, + ) } -Publish(ctx context.Context, publisherID *time.ActorID, event sync.DocEvent) +// server/packs/packs.go +func PushPull(...) (...) { + // Publish DocumentsChangedEvent during pushpull + be.Coordinator.Publish( + ctx, + publisherID, + sync.DocEvent{ + Type: types.DocumentsChangedEvent, + Publisher: publisherID, + DocumentID: docInfo.ID, + }, + ) +} ``` -When performing `WatchDocuments`, `UpdatePresence`, and `PushPull`, the `Publish` method is called to deliver the `DocEvent`. - #### 3. Fire Event -The event is sent to the `Subscription` channels that subscribe to the same document by iterating through the `documentKeys` of the `DocEvent`. +The `Publish` method sends a `DocEvent` to the event channel of subscriptions that are subscribing to the document of the `DocEvent`. Through `subscriptionsMapByDocID`, we can find the subscriptions (created in step 1) that are subscribing to a specific document. Subsequently, the `DocEvent` is sent to the event channels of these subscriptions. [(code🔍)](https://github.com/yorkie-team/yorkie/blob/16fd182021231d75562a933cb32d924af16fc7f4/server/backend/sync/memory/pubsub.go#L150-L196). ```go func (m *PubSub) Publish( @@ -82,34 +110,33 @@ func (m *PubSub) Publish( m.subscriptionsMapMu.RLock() defer m.subscriptionsMapMu.RUnlock() - for _, docKey := range event.DocumentKeys { - k := docKey.String() - - if subs, ok := m.subscriptionsMapByDocKey[k]; ok { - for _, sub := range subs.Map() { - // If the subscriber is itself, do not send - if sub.Subscriber().ID.Compare(publisherID) == 0 { - continue - } + documentID := event.DocumentID + if subs, ok := m.subscriptionsMapByDocID[documentID]; ok { + for _, sub := range subs.Map() { + // If the subscriber is itself, do not send + if sub.Subscriber().Compare(publisherID) == 0 { + continue + } - // Send the event to the peer's event channel - sub.Events() <- event + select { + // Send the event to the peer's event channel + case sub.Events() <- event: } } } } ``` -#### 4. Send watchDocuments response to stream +#### 4. Send watchDocument response to stream -In the `select` statement from step 1, when the `Subscription` channel receives an event, the event is sent to the `watchDocumentsResponse` of the rpc stream. +When the event channel of `Subscription` receives an event, the event is sent to the `WatchDocumentResponse` of the rpc stream. [(code🔍)](https://github.com/yorkie-team/yorkie/blob/16fd182021231d75562a933cb32d924af16fc7f4/server/rpc/yorkie_server.go#L421-L443) ```go -func (s *yorkieServer) WatchDocuments( - req *api.WatchDocumentsRequest, - stream api.YorkieService_WatchDocumentsServer, +func (s *yorkieServer) WatchDocument( + req *api.WatchDocumentRequest, + stream api.YorkieService_WatchDocumentServer, ) error { - // ... + // ... for { select { case <-s.serviceCtx.Done(): @@ -119,12 +146,11 @@ func (s *yorkieServer) WatchDocuments( case event := <-subscription.Events(): eventType, err := converter.ToDocEventType(event.Type) - if err := stream.Send(&api.WatchDocumentsResponse{ - Body: &api.WatchDocumentsResponse_Event{ + if err := stream.Send(&api.WatchDocumentResponse{ + Body: &api.WatchDocumentResponse_Event{ Event: &api.DocEvent{ - Type: eventType, - Publisher: converter.ToClient(event.Publisher), - DocumentKeys: converter.ToDocumentKeys(event.DocumentKeys), + Type: eventType, + Publisher: event.Publisher.Bytes(), }, }, }); err != nil { @@ -137,4 +163,4 @@ func (s *yorkieServer) WatchDocuments( ### Risks and Mitigation -Currently, Subscription instances are managed in memory. \ No newline at end of file +Currently, Subscription instances are managed in memory.