Skip to content

Commit

Permalink
refactor presence event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
7hong13 committed Apr 4, 2024
1 parent 8c2e63f commit 4aa9a04
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 50 deletions.
42 changes: 42 additions & 0 deletions yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -711,5 +711,47 @@ class PresenceTest {
}
}

@Test
fun test_emit_the_same_presence_multiple_times() {
withTwoClientsAndDocuments(realTimeSync = false) { c1, c2, d1, d2, _ ->
val d1Events = mutableListOf<Document.Event.PresenceChange>()
val collectJob = launch(start = CoroutineStart.UNDISPATCHED) {
d1.events.filterIsInstance<Others>().collect(d1Events::add)
}

c1.resume(d1)
c2.resume(d2)

d2.updateAsync { _, presence ->
presence.put(mapOf("a" to "b"))
}.await()

delay(100)

d2.updateAsync { _, presence ->
presence.put(mapOf("a" to "b"))
}.await()

d2.updateAsync { _, presence ->
presence.put(mapOf("a" to "b"))
}.await()

withTimeout(GENERAL_TIMEOUT) {
while (d1Events.size < 4) {
delay(50)
}
}
assertEquals(4, d1Events.size)
assertIs<Others.Watched>(d1Events.first())
d1Events.drop(1).forEach { event ->
assertEquals(
Others.PresenceChanged(PresenceInfo(c2.requireClientId(), mapOf("a" to "b"))),
event,
)
}
collectJob.cancel()
}
}

private data class Cursor(val x: Int, val y: Int)
}
6 changes: 3 additions & 3 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public class Client @VisibleForTesting internal constructor(
if (response.hasInitialization()) {
val document = attachments.value[documentKey]?.document ?: return
val clientIDs = response.initialization.clientIdsList.map { ActorID(it) }
document.presenceEventQueue.add(
document.pendingPresenceEvents.add(
PresenceChange.MyPresence.Initialized(
document.allPresences.value.filterKeys { it in clientIDs }.asPresences(),
),
Expand All @@ -310,7 +310,7 @@ public class Client @VisibleForTesting internal constructor(
// unless we also know their initial presence data at this point.
val presence = document.allPresences.value[publisher]
if (presence != null) {
document.presenceEventQueue.add(
document.pendingPresenceEvents.add(
PresenceChange.Others.Watched(PresenceInfo(publisher, presence)),
)
}
Expand All @@ -322,7 +322,7 @@ public class Client @VisibleForTesting internal constructor(
// when PresenceChange(clear) is applied before unwatching. In that case,
// the 'unwatched' event is triggered while handling the PresenceChange.
val presence = document.presences.value[publisher] ?: return
document.presenceEventQueue.add(
document.pendingPresenceEvents.add(
PresenceChange.Others.Unwatched(PresenceInfo(publisher, presence)),
)
document.onlineClients.value -= publisher
Expand Down
114 changes: 67 additions & 47 deletions yorkie/src/main/kotlin/dev/yorkie/document/Document.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public class Document(
internal val garbageLengthFromClone: Int
get() = clone?.root?.getGarbageLength() ?: 0

internal val presenceEventQueue = mutableListOf<Event.PresenceChange>()
private val presenceEventQueue = mutableListOf<Event.PresenceChange>()
internal val pendingPresenceEvents = mutableListOf<Event.PresenceChange>()

internal val onlineClients = MutableStateFlow(setOf<ActorID>())

Expand All @@ -106,7 +107,13 @@ public class Document(
combine(_presences, onlineClients) { presences, onlineClients ->
presences.filterKeys { it in onlineClients }.asPresences()
}.stateIn(scope, SharingStarted.Eagerly, _presences.value.asPresences()).also {
scope.launch { it.collect(::publishPresenceEvent) }
scope.launch {
it.collect { presences ->
presenceEventQueue.addAll(pendingPresenceEvents)
pendingPresenceEvents.clear()
publishPresenceEvent(presences)
}
}
}

internal val allPresences: StateFlow<Presences> = _presences.asStateFlow()
Expand Down Expand Up @@ -161,12 +168,12 @@ public class Document(
eventStream.emit(Event.LocalChange(change.toChangeInfo(operationInfos)))
}
if (change.hasPresenceChange) {
val presence = newPresences?.get(actorID) ?: _presences.value[actorID]
presence?.let {
presenceEventQueue.add(createPresenceChangedEvent(actorID, presence))
val presence =
newPresences?.get(actorID) ?: _presences.value[actorID] ?: return@async false
newPresences?.let {
emitPresences(it, createPresenceChangedEvent(actorID, presence))
}
}
newPresences?.let { emitPresences(it) }
true
}
}
Expand Down Expand Up @@ -273,7 +280,8 @@ public class Document(
private suspend fun applySnapshot(serverSeq: Long, snapshot: ByteString) {
val (root, presences) = snapshot.toSnapshot()
this.root = CrdtRoot(root)
emitPresences(presences.asPresences())
_presences.value = presences.asPresences()
YorkieLogger.d("Document.snapshot", "Snapshot: ${snapshot.toSnapshot()}")
changeID = changeID.syncLamport(serverSeq)
clone = null
eventStream.emit(Event.Snapshot(snapshot))
Expand Down Expand Up @@ -323,9 +331,7 @@ public class Document(
if (opInfos.isNotEmpty()) {
eventStream.emit(Event.RemoteChange(change.toChangeInfo(opInfos)))
}

presenceEvent?.let(presenceEventQueue::add)
newPresences?.let { emitPresences(it) }
newPresences?.let { emitPresences(it, presenceEvent) }
changeID = changeID.syncLamport(change.id.lamport)
}
}
Expand All @@ -334,12 +340,59 @@ public class Document(
clone ?: RootClone(root.deepCopy(), _presences.value.asPresences()).also { clone = it }
}

private suspend fun emitPresences(newPresences: Presences) {
private suspend fun emitPresences(newPresences: Presences, event: Event.PresenceChange?) {
if (newPresences == _presences.value) {
publishPresenceEvent(newPresences)
event?.let {
if (presenceEventReadyToBePublished(it, newPresences)) {
eventStream.emit(it)
} else {
pendingPresenceEvents.add(it)
}
}
} else {
event?.let(pendingPresenceEvents::add)
_presences.emit(newPresences)
clone = ensureClone().copy(presences = newPresences)
}
}

/**
* Triggers an event in this [Document].
*/
private suspend fun publishPresenceEvent(presences: Presences) {
val iterator = presenceEventQueue.listIterator()
while (iterator.hasNext()) {
val event = iterator.next()
if (event is Others && event.changed.actorID == changeID.actor) {
iterator.remove()
continue
}

if (presenceEventReadyToBePublished(event, presences)) {
eventStream.emit(event)
iterator.remove()
}
}
}

private fun presenceEventReadyToBePublished(
event: Event.PresenceChange,
presences: Presences,
): Boolean {
return when (event) {
is MyPresence.Initialized -> presences.keys.containsAll(event.initialized.keys)
is MyPresence.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
}

is Others.Watched -> event.changed.actorID in presences
is Others.Unwatched -> event.changed.actorID !in presences
is Others.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
}
}
_presences.emit(newPresences)
clone = ensureClone().copy(presences = newPresences)
}

/**
Expand Down Expand Up @@ -390,39 +443,6 @@ public class Document(
return root.garbageCollect(ticket)
}

/**
* Triggers an event in this [Document].
*/
private suspend fun publishPresenceEvent(presences: Presences) {
val publishedEvents = mutableListOf<Event.PresenceChange>()
presenceEventQueue.toList().forEach { event ->
if (event is Others && event.changed.actorID == changeID.actor) {
publishedEvents.add(event)
return@forEach
}

val eventReady = when (event) {
is MyPresence.Initialized -> presences.keys.containsAll(event.initialized.keys)
is MyPresence.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
}

is Others.Watched -> event.changed.actorID in presences
is Others.Unwatched -> event.changed.actorID !in presences
is Others.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
}
}
if (eventReady) {
eventStream.emit(event)
publishedEvents.add(event)
}
}
presenceEventQueue.removeAll(publishedEvents)
}

private fun Change.toChangeInfo(operationInfos: List<OperationInfo>) =
Event.ChangeInfo(message.orEmpty(), operationInfos, id.actor)

Expand Down

0 comments on commit 4aa9a04

Please sign in to comment.