Skip to content

Commit

Permalink
Clean up methods related to presence (yorkie-team#594)
Browse files Browse the repository at this point in the history
* Return presence to user by retrieving only online clients and making a copy

* Trigger unwatched event instead of presence-changed event during detach
  • Loading branch information
chacha912 authored and Wu22e committed Sep 3, 2023
1 parent a307f8d commit a360567
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 86 deletions.
2 changes: 1 addition & 1 deletion api/converter/to_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

// SnapshotToBytes converts the given document to byte array.
func SnapshotToBytes(obj *crdt.Object, presences *innerpresence.Map) ([]byte, error) {
func SnapshotToBytes(obj *crdt.Object, presences map[string]innerpresence.Presence) ([]byte, error) {
pbElem, err := toJSONElement(obj)
if err != nil {
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,11 @@ func ToDocumentSummary(summary *types.DocumentSummary) (*api.DocumentSummary, er
}

// ToPresences converts the given model to Protobuf format.
func ToPresences(presences *innerpresence.Map) map[string]*api.Presence {
func ToPresences(presences map[string]innerpresence.Presence) map[string]*api.Presence {
pbPresences := make(map[string]*api.Presence)
presences.Range(func(k string, v innerpresence.Presence) bool {
for k, v := range presences {
pbPresences[k] = ToPresence(v)
return true
})
}
return pbPresences
}

Expand Down
13 changes: 9 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (c *Client) Watch(
clientIDs = append(clientIDs, id.String())
}

doc.SetOnlineClientSet(clientIDs...)
doc.SetOnlineClients(clientIDs...)
return nil, nil
case *api.WatchDocumentResponse_Event:
eventType, err := converter.FromEventType(resp.Event.Type)
Expand All @@ -450,19 +450,22 @@ func (c *Client) Watch(
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentsWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.OnlinePresence(cli.String()) == nil {
if doc.Presence(cli.String()) == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentWatched,
Presences: map[string]innerpresence.Presence{
cli.String(): doc.OnlinePresence(cli.String()),
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentsUnwatchedEvent:
p := doc.OnlinePresence(cli.String())
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentUnwatched,
Expand Down Expand Up @@ -521,6 +524,8 @@ func (c *Client) Watch(
t := PresenceChanged
if e.Type == document.WatchedEvent {
t = DocumentWatched
} else if e.Type == document.UnwatchedEvent {
t = DocumentUnwatched
}
rch <- WatchResponse{Type: t, Presences: e.Presences}
case <-ctx.Done():
Expand Down
52 changes: 29 additions & 23 deletions pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ const (
// enabling real-time synchronization.
WatchedEvent DocEventType = "watched"

// UnwatchedEvent means that the client has disconnected from the server,
// disabling real-time synchronization.
UnwatchedEvent DocEventType = "unwatched"

// PresenceChangedEvent means that the presences of the clients who are editing
// the document have changed.
PresenceChangedEvent DocEventType = "presence-changed"
Expand Down Expand Up @@ -282,48 +286,50 @@ func (d *Document) ensureClone() error {
return nil
}

// Presences returns the presence map of this document.
func (d *Document) Presences() map[string]innerpresence.Presence {
// TODO(hackerwins): We need to use client key instead of actor ID for exposing presence.
presences := make(map[string]innerpresence.Presence)
d.doc.presences.Range(func(key string, value innerpresence.Presence) bool {
presences[key] = value
return true
})
return presences
// MyPresence returns the presence of the actor.
func (d *Document) MyPresence() innerpresence.Presence {
return d.doc.MyPresence()
}

// Presence returns the presence of the given client.
// If the client is not online, it returns nil.
func (d *Document) Presence(clientID string) innerpresence.Presence {
return d.doc.Presence(clientID)
}

// MyPresence returns the presence of the actor.
func (d *Document) MyPresence() innerpresence.Presence {
return d.doc.MyPresence()
// PresenceForTest returns the presence of the given client
// regardless of whether the client is online or not.
func (d *Document) PresenceForTest(clientID string) innerpresence.Presence {
return d.doc.PresenceForTest(clientID)
}

// SetOnlineClientSet sets the online client set.
func (d *Document) SetOnlineClientSet(clientIDs ...string) {
d.doc.SetOnlineClientSet(clientIDs...)
// Presences returns the presence map of online clients.
func (d *Document) Presences() map[string]innerpresence.Presence {
// TODO(hackerwins): We need to use client key instead of actor ID for exposing presence.
return d.doc.Presences()
}

// AllPresences returns the presence map of all clients
// regardless of whether the client is online or not.
func (d *Document) AllPresences() map[string]innerpresence.Presence {
return d.doc.AllPresences()
}

// AddOnlineClient adds the given client to the online client set.
// SetOnlineClients sets the online clients.
func (d *Document) SetOnlineClients(clientIDs ...string) {
d.doc.SetOnlineClients(clientIDs...)
}

// AddOnlineClient adds the given client to the online clients.
func (d *Document) AddOnlineClient(clientID string) {
d.doc.AddOnlineClient(clientID)
}

// RemoveOnlineClient removes the given client from the online client set.
// RemoveOnlineClient removes the given client from the online clients.
func (d *Document) RemoveOnlineClient(clientID string) {
d.doc.RemoveOnlineClient(clientID)
}

// OnlinePresence returns the presence of the given client. If the client is not
// online, it returns nil.
func (d *Document) OnlinePresence(clientID string) innerpresence.Presence {
return d.doc.OnlinePresence(clientID)
}

// Events returns the events of this document.
func (d *Document) Events() <-chan DocEvent {
return d.events
Expand Down
9 changes: 6 additions & 3 deletions pkg/document/innerpresence/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ func (m *Map) Range(f func(clientID string, presence Presence) bool) {
}

// Load returns the presence for the given clientID.
func (m *Map) Load(clientID string) (Presence, bool) {
func (m *Map) Load(clientID string) Presence {
presence, ok := m.presences.Load(clientID)
if !ok {
return nil, false
return nil
}

return presence.(Presence), true
return presence.(Presence)
}

// LoadOrStore returns the existing presence if exists.
Expand Down Expand Up @@ -140,6 +140,9 @@ func (p Presence) Clear() {

// DeepCopy copies itself deeply.
func (p Presence) DeepCopy() Presence {
if p == nil {
return nil
}
clone := make(map[string]string)
for k, v := range p {
clone[k] = v
Expand Down
92 changes: 63 additions & 29 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,29 @@ func (d *InternalDocument) ApplyChanges(changes ...*change.Change) ([]DocEvent,
if c.PresenceChange() != nil {
clientID := c.ID().ActorID().String()
if _, ok := d.onlineClients.Load(clientID); ok {
event := DocEvent{
Type: PresenceChangedEvent,
Presences: map[string]innerpresence.Presence{
clientID: c.PresenceChange().Presence,
},
switch c.PresenceChange().ChangeType {
case innerpresence.Put:
eventType := PresenceChangedEvent
if !d.presences.Has(clientID) {
eventType = WatchedEvent
}
event := DocEvent{
Type: eventType,
Presences: map[string]innerpresence.Presence{
clientID: c.PresenceChange().Presence,
},
}
events = append(events, event)
case innerpresence.Clear:
event := DocEvent{
Type: UnwatchedEvent,
Presences: map[string]innerpresence.Presence{
clientID: d.Presence(clientID),
},
}
events = append(events, event)
d.RemoveOnlineClient(clientID)
}

if !d.presences.Has(clientID) {
event.Type = WatchedEvent
}
events = append(events, event)
}
}

Expand All @@ -283,34 +295,56 @@ func (d *InternalDocument) ApplyChanges(changes ...*change.Change) ([]DocEvent,

// MyPresence returns the presence of the actor currently editing the document.
func (d *InternalDocument) MyPresence() innerpresence.Presence {
p := d.presences.LoadOrStore(d.changeID.ActorID().String(), innerpresence.NewPresence())
if d.status != StatusAttached {
return innerpresence.NewPresence()
}
p := d.presences.Load(d.changeID.ActorID().String())
return p.DeepCopy()
}

// Presences returns the map of presences of the actors currently editing the document.
func (d *InternalDocument) Presences() *innerpresence.Map {
return d.presences
}

// OnlinePresence returns the presence of the given client. If the client is not
// online, it returns nil.
func (d *InternalDocument) OnlinePresence(clientID string) innerpresence.Presence {
// Presence returns the presence of the given client.
// If the client is not online, it returns nil.
func (d *InternalDocument) Presence(clientID string) innerpresence.Presence {
if _, ok := d.onlineClients.Load(clientID); !ok {
return nil
}

presence, _ := d.presences.Load(clientID)
return presence
return d.presences.Load(clientID).DeepCopy()
}

// Presence returns the presence of the given client.
func (d *InternalDocument) Presence(clientID string) innerpresence.Presence {
presence, _ := d.presences.Load(clientID)
return presence
// PresenceForTest returns the presence of the given client
// regardless of whether the client is online or not.
func (d *InternalDocument) PresenceForTest(clientID string) innerpresence.Presence {
return d.presences.Load(clientID).DeepCopy()
}

// Presences returns the presence map of online clients.
func (d *InternalDocument) Presences() map[string]innerpresence.Presence {
presences := make(map[string]innerpresence.Presence)
d.onlineClients.Range(func(key, value interface{}) bool {
p := d.presences.Load(key.(string))
if p == nil {
return true
}
presences[key.(string)] = p.DeepCopy()
return true
})
return presences
}

// AllPresences returns the presence map of all clients
// regardless of whether the client is online or not.
func (d *InternalDocument) AllPresences() map[string]innerpresence.Presence {
presences := make(map[string]innerpresence.Presence)
d.presences.Range(func(key string, value innerpresence.Presence) bool {
presences[key] = value.DeepCopy()
return true
})
return presences
}

// SetOnlineClientSet sets the online client set.
func (d *InternalDocument) SetOnlineClientSet(ids ...string) {
// SetOnlineClients sets the online clients.
func (d *InternalDocument) SetOnlineClients(ids ...string) {
d.onlineClients.Range(func(key, value interface{}) bool {
d.onlineClients.Delete(key)
return true
Expand All @@ -321,12 +355,12 @@ func (d *InternalDocument) SetOnlineClientSet(ids ...string) {
}
}

// AddOnlineClient adds the given client to the online client set.
// AddOnlineClient adds the given client to the online clients.
func (d *InternalDocument) AddOnlineClient(clientID string) {
d.onlineClients.Store(clientID, true)
}

// RemoveOnlineClient removes the given client from the online client set.
// RemoveOnlineClient removes the given client from the online clients.
func (d *InternalDocument) RemoveOnlineClient(clientID string) {
d.onlineClients.Delete(clientID)
}
2 changes: 1 addition & 1 deletion server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ func (d *DB) CreateSnapshotInfo(
docID types.ID,
doc *document.InternalDocument,
) error {
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences())
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ func (c *Client) CreateSnapshotInfo(
if err != nil {
return err
}
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences())
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/packs/pushpull.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func pullSnapshot(
}
cpAfterPull := cpAfterPush.NextServerSeq(docInfo.ServerSeq)

snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences())
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/rpc/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *adminServer) GetSnapshotMeta(
return nil, err
}

snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences())
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit a360567

Please sign in to comment.