From 4aa9a04addae5cb9bb09d99792fd317a5f2f6e5b Mon Sep 17 00:00:00 2001 From: jee-hyun-kim Date: Thu, 4 Apr 2024 15:23:43 +0900 Subject: [PATCH] refactor presence event handling --- .../kotlin/dev/yorkie/core/PresenceTest.kt | 42 +++++++ .../src/main/kotlin/dev/yorkie/core/Client.kt | 6 +- .../kotlin/dev/yorkie/document/Document.kt | 114 ++++++++++-------- 3 files changed, 112 insertions(+), 50 deletions(-) diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt index ce498905d..879a9ddc1 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt @@ -711,5 +711,47 @@ class PresenceTest { } } + @Test + fun test_emit_the_same_presence_multiple_times() { + withTwoClientsAndDocuments(realTimeSync = false) { c1, c2, d1, d2, _ -> + val d1Events = mutableListOf() + val collectJob = launch(start = CoroutineStart.UNDISPATCHED) { + d1.events.filterIsInstance().collect(d1Events::add) + } + + c1.resume(d1) + c2.resume(d2) + + d2.updateAsync { _, presence -> + presence.put(mapOf("a" to "b")) + }.await() + + delay(100) + + d2.updateAsync { _, presence -> + presence.put(mapOf("a" to "b")) + }.await() + + d2.updateAsync { _, presence -> + presence.put(mapOf("a" to "b")) + }.await() + + withTimeout(GENERAL_TIMEOUT) { + while (d1Events.size < 4) { + delay(50) + } + } + assertEquals(4, d1Events.size) + assertIs(d1Events.first()) + d1Events.drop(1).forEach { event -> + assertEquals( + Others.PresenceChanged(PresenceInfo(c2.requireClientId(), mapOf("a" to "b"))), + event, + ) + } + collectJob.cancel() + } + } + private data class Cursor(val x: Int, val y: Int) } diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index 1811393c9..01e5dc350 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -288,7 +288,7 @@ public class Client @VisibleForTesting internal constructor( if (response.hasInitialization()) { val document = attachments.value[documentKey]?.document ?: return val clientIDs = response.initialization.clientIdsList.map { ActorID(it) } - document.presenceEventQueue.add( + document.pendingPresenceEvents.add( PresenceChange.MyPresence.Initialized( document.allPresences.value.filterKeys { it in clientIDs }.asPresences(), ), @@ -310,7 +310,7 @@ public class Client @VisibleForTesting internal constructor( // unless we also know their initial presence data at this point. val presence = document.allPresences.value[publisher] if (presence != null) { - document.presenceEventQueue.add( + document.pendingPresenceEvents.add( PresenceChange.Others.Watched(PresenceInfo(publisher, presence)), ) } @@ -322,7 +322,7 @@ public class Client @VisibleForTesting internal constructor( // when PresenceChange(clear) is applied before unwatching. In that case, // the 'unwatched' event is triggered while handling the PresenceChange. val presence = document.presences.value[publisher] ?: return - document.presenceEventQueue.add( + document.pendingPresenceEvents.add( PresenceChange.Others.Unwatched(PresenceInfo(publisher, presence)), ) document.onlineClients.value -= publisher diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt index bf133722e..497a1ce76 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt @@ -97,7 +97,8 @@ public class Document( internal val garbageLengthFromClone: Int get() = clone?.root?.getGarbageLength() ?: 0 - internal val presenceEventQueue = mutableListOf() + private val presenceEventQueue = mutableListOf() + internal val pendingPresenceEvents = mutableListOf() internal val onlineClients = MutableStateFlow(setOf()) @@ -106,7 +107,13 @@ public class Document( combine(_presences, onlineClients) { presences, onlineClients -> presences.filterKeys { it in onlineClients }.asPresences() }.stateIn(scope, SharingStarted.Eagerly, _presences.value.asPresences()).also { - scope.launch { it.collect(::publishPresenceEvent) } + scope.launch { + it.collect { presences -> + presenceEventQueue.addAll(pendingPresenceEvents) + pendingPresenceEvents.clear() + publishPresenceEvent(presences) + } + } } internal val allPresences: StateFlow = _presences.asStateFlow() @@ -161,12 +168,12 @@ public class Document( eventStream.emit(Event.LocalChange(change.toChangeInfo(operationInfos))) } if (change.hasPresenceChange) { - val presence = newPresences?.get(actorID) ?: _presences.value[actorID] - presence?.let { - presenceEventQueue.add(createPresenceChangedEvent(actorID, presence)) + val presence = + newPresences?.get(actorID) ?: _presences.value[actorID] ?: return@async false + newPresences?.let { + emitPresences(it, createPresenceChangedEvent(actorID, presence)) } } - newPresences?.let { emitPresences(it) } true } } @@ -273,7 +280,8 @@ public class Document( private suspend fun applySnapshot(serverSeq: Long, snapshot: ByteString) { val (root, presences) = snapshot.toSnapshot() this.root = CrdtRoot(root) - emitPresences(presences.asPresences()) + _presences.value = presences.asPresences() + YorkieLogger.d("Document.snapshot", "Snapshot: ${snapshot.toSnapshot()}") changeID = changeID.syncLamport(serverSeq) clone = null eventStream.emit(Event.Snapshot(snapshot)) @@ -323,9 +331,7 @@ public class Document( if (opInfos.isNotEmpty()) { eventStream.emit(Event.RemoteChange(change.toChangeInfo(opInfos))) } - - presenceEvent?.let(presenceEventQueue::add) - newPresences?.let { emitPresences(it) } + newPresences?.let { emitPresences(it, presenceEvent) } changeID = changeID.syncLamport(change.id.lamport) } } @@ -334,12 +340,59 @@ public class Document( clone ?: RootClone(root.deepCopy(), _presences.value.asPresences()).also { clone = it } } - private suspend fun emitPresences(newPresences: Presences) { + private suspend fun emitPresences(newPresences: Presences, event: Event.PresenceChange?) { if (newPresences == _presences.value) { - publishPresenceEvent(newPresences) + event?.let { + if (presenceEventReadyToBePublished(it, newPresences)) { + eventStream.emit(it) + } else { + pendingPresenceEvents.add(it) + } + } + } else { + event?.let(pendingPresenceEvents::add) + _presences.emit(newPresences) + clone = ensureClone().copy(presences = newPresences) + } + } + + /** + * Triggers an event in this [Document]. + */ + private suspend fun publishPresenceEvent(presences: Presences) { + val iterator = presenceEventQueue.listIterator() + while (iterator.hasNext()) { + val event = iterator.next() + if (event is Others && event.changed.actorID == changeID.actor) { + iterator.remove() + continue + } + + if (presenceEventReadyToBePublished(event, presences)) { + eventStream.emit(event) + iterator.remove() + } + } + } + + private fun presenceEventReadyToBePublished( + event: Event.PresenceChange, + presences: Presences, + ): Boolean { + return when (event) { + is MyPresence.Initialized -> presences.keys.containsAll(event.initialized.keys) + is MyPresence.PresenceChanged -> { + val actorID = event.changed.actorID + actorID !in presences || event.changed.presence == presences[actorID] + } + + is Others.Watched -> event.changed.actorID in presences + is Others.Unwatched -> event.changed.actorID !in presences + is Others.PresenceChanged -> { + val actorID = event.changed.actorID + actorID !in presences || event.changed.presence == presences[actorID] + } } - _presences.emit(newPresences) - clone = ensureClone().copy(presences = newPresences) } /** @@ -390,39 +443,6 @@ public class Document( return root.garbageCollect(ticket) } - /** - * Triggers an event in this [Document]. - */ - private suspend fun publishPresenceEvent(presences: Presences) { - val publishedEvents = mutableListOf() - presenceEventQueue.toList().forEach { event -> - if (event is Others && event.changed.actorID == changeID.actor) { - publishedEvents.add(event) - return@forEach - } - - val eventReady = when (event) { - is MyPresence.Initialized -> presences.keys.containsAll(event.initialized.keys) - is MyPresence.PresenceChanged -> { - val actorID = event.changed.actorID - actorID !in presences || event.changed.presence == presences[actorID] - } - - is Others.Watched -> event.changed.actorID in presences - is Others.Unwatched -> event.changed.actorID !in presences - is Others.PresenceChanged -> { - val actorID = event.changed.actorID - actorID !in presences || event.changed.presence == presences[actorID] - } - } - if (eventReady) { - eventStream.emit(event) - publishedEvents.add(event) - } - } - presenceEventQueue.removeAll(publishedEvents) - } - private fun Change.toChangeInfo(operationInfos: List) = Event.ChangeInfo(message.orEmpty(), operationInfos, id.actor)