Skip to content

Commit

Permalink
Clean up methods related to presence (#599)
Browse files Browse the repository at this point in the history
* Add presence to unwatched event and modify getPresence to retrieve only online client

* Return presence to the user by making a copy

* Add test for pause and resume

* Update test/integration/presence_test.ts

* Fix some non-deterministic behavior in test cases related to Presence

---------

Co-authored-by: Youngteac Hong <[email protected]>
  • Loading branch information
chacha912 and hackerwins authored Aug 4, 2023
1 parent 1decb55 commit 8fd150a
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 85 deletions.
13 changes: 9 additions & 4 deletions public/quill.html
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@
});
doc.subscribe('others', (event) => {
if (event.type === 'unwatched') {
const { clientID } = event.value;
cursors.removeCursor(doc.getPresence(clientID).username);
const { clientID, presence } = event.value;
cursors.removeCursor(presence.username);
} else if (event.type === 'presence-changed') {
const { clientID, presence } = event.value;
const range = doc
Expand Down Expand Up @@ -222,8 +222,13 @@
to = from + op.delete;
console.log(`%c local: ${from}-${to}: ''`, 'color: green');

doc.update((root) => {
root.content.edit(from, to, '');
doc.update((root, presence) => {
const range = root.content.edit(from, to, '');
if (range) {
presence.set({
selection: root.content.indexRangeToPosRange(range),
});
}
}, `update content by ${client.getID()}`);
} else if (op.retain !== undefined) {
from = to + op.retain;
Expand Down
15 changes: 11 additions & 4 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,8 @@ export class Client implements Observable<ClientEvent> {
break;
case PbDocEventType.DOC_EVENT_TYPE_DOCUMENTS_WATCHED:
attachment.doc.addOnlineClient(publisher);
// NOTE(chacha912): We added to onlineClients, but we won't trigger watched event
// unless we also know their initial presence data at this point.
if (attachment.doc.hasPresence(publisher)) {
attachment.doc.publish({
type: DocEventType.Watched,
Expand All @@ -903,11 +905,16 @@ export class Client implements Observable<ClientEvent> {
}
break;
case PbDocEventType.DOC_EVENT_TYPE_DOCUMENTS_UNWATCHED: {
const presence = attachment.doc.getPresence(publisher);
attachment.doc.removeOnlineClient(publisher);
attachment.doc.publish({
type: DocEventType.Unwatched,
value: { clientID: publisher },
});
// NOTE(chacha912): There is no presence, when PresenceChange(clear) is applied before unwatching.
// In that case, the 'unwatched' event is triggered while handling the PresenceChange.
if (presence) {
attachment.doc.publish({
type: DocEventType.Unwatched,
value: { clientID: publisher, presence },
});
}
break;
}
}
Expand Down
97 changes: 68 additions & 29 deletions src/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export type DocEvent<P extends Indexable = Indexable, T = OperationInfo> =
| RemoteChangeEvent<T>
| InitializedEvent<P>
| WatchedEvent<P>
| UnwatchedEvent
| UnwatchedEvent<P>
| PresenceChangedEvent<P>;

/**
Expand Down Expand Up @@ -223,9 +223,9 @@ export interface WatchedEvent<P extends Indexable> extends BaseDocEvent {
value: { clientID: ActorID; presence: P };
}

export interface UnwatchedEvent extends BaseDocEvent {
export interface UnwatchedEvent<P extends Indexable> extends BaseDocEvent {
type: DocEventType.Unwatched;
value: { clientID: ActorID };
value: { clientID: ActorID; presence: P };
}

export interface PresenceChangedEvent<P extends Indexable>
Expand Down Expand Up @@ -446,23 +446,21 @@ export class Document<T, P extends Indexable = Indexable> {
this.clone!.root,
message,
);
const actorID = this.changeID.getActorID()!;

try {
const proxy = createJSON<JSONObject<T>>(
context,
this.clone!.root.getObject(),
);

if (!this.presences.has(this.changeID.getActorID()!)) {
this.clone!.presences.set(this.changeID.getActorID()!, {} as P);
if (!this.presences.has(actorID)) {
this.clone!.presences.set(actorID, {} as P);
}

updater(
proxy,
new Presence(
context,
this.clone!.presences.get(this.changeID.getActorID()!)!,
),
new Presence(context, this.clone!.presences.get(actorID)!),
);
} catch (err) {
// drop clone because it is contaminated.
Expand All @@ -487,7 +485,7 @@ export class Document<T, P extends Indexable = Indexable> {
value: {
message: change.getMessage() || '',
operations: opInfos,
actor: change.getID().getActorID(),
actor: actorID,
},
});
}
Expand All @@ -496,8 +494,8 @@ export class Document<T, P extends Indexable = Indexable> {
this.publish({
type: DocEventType.PresenceChanged,
value: {
clientID: change.getID().getActorID()!,
presence: this.getPresence(change.getID().getActorID()!)!,
clientID: actorID,
presence: this.getPresence(actorID)!,
},
});
}
Expand Down Expand Up @@ -975,21 +973,40 @@ export class Document<T, P extends Indexable = Indexable> {
let changeInfo: ChangeInfo | undefined;
let docEvent: DocEvent<P> | undefined;
const actorID = change.getID().getActorID()!;
if (change.hasPresenceChange()) {
if (change.hasPresenceChange() && this.onlineClients.has(actorID)) {
const presenceChange = change.getPresenceChange()!;
if (
presenceChange.type === PresenceChangeType.Put &&
this.onlineClients.has(actorID)
) {
docEvent = {
type: this.presences.has(actorID)
? DocEventType.PresenceChanged
: DocEventType.Watched,
value: {
clientID: actorID,
presence: presenceChange.presence,
},
};
switch (presenceChange.type) {
case PresenceChangeType.Put:
// NOTE(chacha912): When the user exists in onlineClients, but
// their presence was initially absent, we can consider that we have
// received their initial presence, so trigger the 'watched' event.
docEvent = {
type: this.presences.has(actorID)
? DocEventType.PresenceChanged
: DocEventType.Watched,
value: {
clientID: actorID,
presence: presenceChange.presence,
},
};
break;
case PresenceChangeType.Clear:
// NOTE(chacha912): When the user exists in onlineClients, but
// PresenceChange(clear) is received, we can consider it as detachment
// occurring before unwatching.
// Detached user is no longer participating in the document, we remove
// them from the online clients and trigger the 'unwatched' event.
docEvent = {
type: DocEventType.Unwatched,
value: {
clientID: actorID,
presence: this.getPresence(actorID)!,
},
};
this.removeOnlineClient(actorID);
break;
default:
break;
}
}

Expand Down Expand Up @@ -1047,27 +1064,35 @@ export class Document<T, P extends Indexable = Indexable> {

/**
* `setOnlineClients` sets the given online client set.
*
* @internal
*/
public setOnlineClients(onlineClients: Set<ActorID>) {
this.onlineClients = onlineClients;
}

/**
* `addOnlineClient` adds the given clientID into the online client set.
*
* @internal
*/
public addOnlineClient(clientID: ActorID) {
this.onlineClients.add(clientID);
}

/**
* `removeOnlineClient` removes the clientID from the online client set.
*
* @internal
*/
public removeOnlineClient(clientID: ActorID) {
this.onlineClients.delete(clientID);
}

/**
* `hasPresence` returns whether the given clientID has a presence or not.
*
* @internal
*/
public hasPresence(clientID: ActorID): boolean {
return this.presences.has(clientID);
Expand All @@ -1081,14 +1106,28 @@ export class Document<T, P extends Indexable = Indexable> {
return {} as P;
}

return this.presences.get(this.changeID.getActorID()!)!;
const p = this.presences.get(this.changeID.getActorID()!)!;
return deepcopy(p);
}

/**
* `getPresence` returns the presence of the given clientID.
*/
public getPresence(clientID: ActorID): P | undefined {
return this.presences.get(clientID);
if (!this.onlineClients.has(clientID)) return;
const p = this.presences.get(clientID);
return p ? deepcopy(p) : undefined;
}

/**
* `getPresenceForTest` returns the presence of the given clientID
* regardless of whether the client is online or not.
*
* @internal
*/
public getPresenceForTest(clientID: ActorID): P | undefined {
const p = this.presences.get(clientID);
return p ? deepcopy(p) : undefined;
}

/**
Expand All @@ -1100,7 +1139,7 @@ export class Document<T, P extends Indexable = Indexable> {
if (this.presences.has(clientID)) {
presences.push({
clientID,
presence: this.presences.get(clientID)!,
presence: deepcopy(this.presences.get(clientID)!),
});
}
}
Expand Down
17 changes: 13 additions & 4 deletions test/helper/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ import { OperationInfo } from '@yorkie-js-sdk/src/document/operation/operation';

export type Indexable = Record<string, any>;

export async function sleep(interval = 1000): Promise<void> {
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, interval);
});
}

export async function waitStubCallCount(
stub: sinon.SinonStub,
callCount: number,
Expand All @@ -31,12 +39,13 @@ export async function waitStubCallCount(
const doLoop = () => {
if (stub.callCount >= callCount) {
resolve();
return;
}
return false;

setTimeout(doLoop, 0);
};
if (!doLoop()) {
setTimeout(doLoop, 1000);
}

doLoop();
});
}

Expand Down
33 changes: 22 additions & 11 deletions test/integration/client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ describe('Client', function () {
});

await waitStubCallCount(stubD2, 1); // d2 should be able to update
assert.equal(d2Events.pop(), DocEventType.LocalChange);

assert.equal(d2Events.at(-1), DocEventType.LocalChange);
await waitStubCallCount(stubD1, 1); // d1 should be able to receive d2's update
assert.equal(d1Events.pop(), DocEventType.RemoteChange);
assert.equal(d1Events.at(-1), DocEventType.RemoteChange);
assert.equal(d1.toSortedJSON(), d2.toSortedJSON());

// Simulate network error
Expand All @@ -193,10 +194,11 @@ describe('Client', function () {
});

await waitStubCallCount(stubD2, 2); // d2 should be able to update
assert.equal(d2Events.pop(), DocEventType.LocalChange);
assert.equal(d2Events.at(-1), DocEventType.LocalChange);
await waitStubCallCount(stubC2, 2); // c2 should fail to sync

assert.equal(
c2Events.pop(),
c2Events.at(-1),
DocumentSyncResultType.SyncFailed,
'c2 sync fail',
);
Expand All @@ -205,7 +207,7 @@ describe('Client', function () {
assert.equal(err.message, 'INVALID_STATE_ERR - 0'); // c1 should also fail to sync
});
assert.equal(
c1Events.pop(),
c1Events.at(-1),
DocumentSyncResultType.SyncFailed,
'c1 sync fail',
);
Expand All @@ -216,11 +218,16 @@ describe('Client', function () {
xhr.restore();

await waitStubCallCount(stubC2, 3); // wait for c2 to sync
assert.equal(c2Events.pop(), DocumentSyncResultType.Synced, 'c2 sync');
await waitStubCallCount(stubC1, 5);
assert.equal(c1Events.pop(), DocumentSyncResultType.Synced, 'c1 sync');
assert.equal(c2Events.at(-1), DocumentSyncResultType.Synced, 'c2 sync');
await waitStubCallCount(stubC1, 6);
assert.isTrue(
[c1Events.at(-1), c1Events.at(-2)].includes(
DocumentSyncResultType.Synced,
),
'c1 sync',
);
await waitStubCallCount(stubD1, 2);
assert.equal(d1Events.pop(), DocEventType.RemoteChange); // d1 should be able to receive d2's update
assert.equal(d1Events.at(-1), DocEventType.RemoteChange); // d1 should be able to receive d2's update
assert.equal(d1.toSortedJSON(), d2.toSortedJSON());

unsub1.client();
Expand Down Expand Up @@ -268,8 +275,12 @@ describe('Client', function () {
root.version = 'v2';
});
await c1.sync();
await waitStubCallCount(stubC2, 2);
assert.equal(c2Events.pop(), ClientEventType.DocumentSynced);
await waitStubCallCount(stubC2, 3);
assert.isTrue(
[c2Events.at(-1), c2Events.at(-1)].includes(
ClientEventType.DocumentSynced,
),
);
assert.equal(d1.toSortedJSON(), d2.toSortedJSON());
unsub1();

Expand Down
8 changes: 4 additions & 4 deletions test/integration/document_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -589,26 +589,26 @@ describe('Document', function () {
d1.update((root) => {
root['k1'] = [1, 2];
}, 'set array');
await c1.attach(d1);
await c1.attach(d1, { isRealtimeSync: false });
assert.equal(d1.toSortedJSON(), '{"k1":[1,2]}');

const c2 = new yorkie.Client(testRPCAddr);
await c2.activate();
const d2 = new yorkie.Document<TestDoc>(docKey);
await c2.attach(d2);
await c2.attach(d2, { isRealtimeSync: false });
assert.equal(d2.toSortedJSON(), '{"k1":[1,2]}');

// 02. c1 updates d1 and removes it.
d1.update((root) => {
root['k1'].push(3);
});
await c1.remove(d1);
assert.equal(d1.toSortedJSON(), '{"k1":[1,2,3]}');
assert.equal(d1.toSortedJSON(), '{"k1":[1,2,3]}', 'd1');
assert.equal(d1.getStatus(), DocumentStatus.Removed);

// 03. c2 syncs and checks that d2 is removed.
await c2.sync();
assert.equal(d2.toSortedJSON(), '{"k1":[1,2,3]}');
assert.equal(d2.toSortedJSON(), '{"k1":[1,2,3]}', 'd2');
assert.equal(d2.getStatus(), DocumentStatus.Removed);

await c1.deactivate();
Expand Down
Loading

0 comments on commit 8fd150a

Please sign in to comment.