Skip to content

Commit

Permalink
fix: stuck while processing messages (#2837)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitorhugods authored Jun 24, 2024
1 parent e59b28f commit f34993a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import com.wire.kalium.logic.logStructuredJson
import com.wire.kalium.logic.sync.SyncManager
import com.wire.kalium.util.DateTimeUtil
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collectLatest
Expand Down Expand Up @@ -66,7 +67,7 @@ internal class ConfirmationDeliveryHandlerImpl(
) : ConfirmationDeliveryHandler {

private val kaliumLogger = kaliumLogger.withTextTag("ConfirmationDeliveryHandler")
private val holder = MutableSharedFlow<Unit>()
private val holder = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
private val mutex = Mutex()

override suspend fun enqueueConfirmationDelivery(conversationId: ConversationId, messageId: String) = mutex.withLock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import io.mockative.any
import io.mockative.coEvery
import io.mockative.coVerify
import io.mockative.mock
import io.mockative.twice
import io.mockative.once
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
Expand All @@ -50,6 +51,7 @@ import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds

class ConfirmationDeliveryHandlerTest {

Expand Down Expand Up @@ -168,10 +170,63 @@ class ConfirmationDeliveryHandlerTest {
job.cancel()

coVerify { arrangement.conversationRepository.observeCacheDetailsById(any()) }.wasInvoked()
coVerify { arrangement.messageSender.sendMessage(any(), any()) }.wasInvoked(twice)
coVerify { arrangement.messageSender.sendMessage(any(), any()) }.wasInvoked(once)
assertTrue(arrangement.pendingConfirmationMessages.isEmpty())
}


@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun givenMultipleEnqueues_whenSendingConfirmations_thenShouldOnlySendOnce() = runTest {
val (arrangement, sut) = Arrangement()
.withCurrentClientIdProvider()
.withConversationDetailsResult(flowOf(TestConversation.CONVERSATION).right())
.withMessageSenderResult()
.arrange()

val job = launch { sut.sendPendingConfirmations() }
advanceUntilIdle()

repeat(100) {
sut.enqueueConfirmationDelivery(TestConversation.ID, uuid4().toString())
}
advanceUntilIdle()
job.cancel()

coVerify { arrangement.conversationRepository.observeCacheDetailsById(any()) }.wasInvoked()
coVerify { arrangement.messageSender.sendMessage(any(), any()) }.wasInvoked(once)
assertTrue(arrangement.pendingConfirmationMessages.isEmpty())
}

@Test
fun givenSyncIsOngoing_whenItTakesLongTimeToExecute_thenShouldReturnAnyway() = runTest {
val (arrangement, handler) = Arrangement()
.withCurrentClientIdProvider()
.withConversationDetailsResult(flowOf(TestConversation.CONVERSATION).right())
.withMessageSenderResult()
.arrange()

coEvery { arrangement.syncManager.waitUntilLive() }.invokes { ->
delay(10.seconds)
}

val sendJob = launch(Dispatchers.Default) {
handler.sendPendingConfirmations()
}
advanceTimeBy(1.seconds)

val enqueueJob = launch {
handler.enqueueConfirmationDelivery(
TestConversation.ID,
TestMessage.TEST_MESSAGE_ID
)
}
advanceTimeBy(1.seconds) // Enqueue and return immediately

assertTrue(enqueueJob.isCompleted)
sendJob.cancel()
}

private class Arrangement {

@Mock
Expand Down

0 comments on commit f34993a

Please sign in to comment.