Skip to content

Commit

Permalink
fix: port migration 1 on 1 resolution for mls migration (WPB-15191) (…
Browse files Browse the repository at this point in the history
…WPB-11194) (#3221)

* fix: fallback to proteus if conversation already present but MLS is default (WPB-15191) (#3200)

* fix: fallback to proteus if conversation already present and no common protocol

* fix: test coverage

* fix: test coverage

* fix: Migration message not shown in 1:1 conversations (#3042)

* fix: update common protocol resolution (#3208)

* feat: detekt

---------

Co-authored-by: Oussama Hassine <[email protected]>
  • Loading branch information
yamilmedina and ohassine authored Jan 9, 2025
1 parent f8e4329 commit f9fcff1
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,8 @@ class UserSessionScope internal constructor(
conversationGroupRepository,
conversationRepository,
messageRepository,
userRepository
userRepository,
systemMessageInserter
)
private val oneOnOneResolver: OneOnOneResolver
get() = OneOnOneResolverImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ import kotlinx.coroutines.flow.first
/**
* Operation that creates one-to-one Conversation with specific [UserId] (only if it is absent in local DB)
* and returns [Conversation] data.
*
* @param otherUserId [UserId] private conversation with which we are interested in.
* @return Result with [Conversation] in case of success, or [CoreFailure] if something went wrong:
* can't get data from local DB, or can't create a conversation.
*/
interface GetOrCreateOneToOneConversationUseCase {
suspend operator fun invoke(otherUserId: UserId): CreateConversationResult
Expand All @@ -47,6 +43,14 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
private val userRepository: UserRepository,
private val oneOnOneResolver: OneOnOneResolver
) : GetOrCreateOneToOneConversationUseCase {

/**
* The use case operation operation params and return type.
*
* @param otherUserId [UserId] private conversation with which we are interested in.
* @return Result with [Conversation] in case of success, or [CoreFailure] if something went wrong:
* can't get data from local DB, or can't create a conversation.
*/
override suspend operator fun invoke(otherUserId: UserId): CreateConversationResult {
// TODO periodically re-resolve one-on-one
return conversationRepository.observeOneToOneConversationWithOtherUser(otherUserId)
Expand All @@ -66,6 +70,18 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
})
}

/**
* Resolves one-on-one conversation with the user.
* Resolving conversations is the process of:
*
* - Intersecting the supported protocols of the self user and the other user.
* - Selecting the common protocol, based on the team settings with the highest priority.
* - Get or create a conversation with the other user.
* - If the protocol now is MLS, migrate the existing Proteus conversation to MLS.
* - Mark the conversation as active.
*
* If no common protocol is found, and we have existing Proteus conversations, we do best effort to use them as fallback.
*/
private suspend fun resolveOneOnOneConversationWithUser(otherUserId: UserId): Either<CoreFailure, Conversation> =
userRepository.userById(otherUserId).flatMap { otherUser ->
// TODO support lazily establishing mls group for team 1-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.wire.kalium.logic.data.conversation.ConversationGroupRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.message.SystemMessageInserter
import com.wire.kalium.logic.data.user.OtherUser
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.data.user.type.isTeammate
Expand All @@ -35,7 +36,19 @@ import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger

interface OneOnOneMigrator {
/**
* Migrates the user's one-on-one Proteus. Without creating a new one since MLS is the default, marking it as active.
*/
suspend fun migrateExistingProteus(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Get one-on-one conversation with the user, if not found, create a new one (Proteus still default) and mark it as active.
*/
suspend fun migrateToProteus(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Perform migration of Proteus to MLS keeping history and marking the new conversation as active.
*/
suspend fun migrateToMLS(user: OtherUser): Either<CoreFailure, ConversationId>
}

Expand All @@ -44,7 +57,8 @@ internal class OneOnOneMigratorImpl(
private val conversationGroupRepository: ConversationGroupRepository,
private val conversationRepository: ConversationRepository,
private val messageRepository: MessageRepository,
private val userRepository: UserRepository
private val userRepository: UserRepository,
private val systemMessageInserter: SystemMessageInserter
) : OneOnOneMigrator {

override suspend fun migrateToProteus(user: OtherUser): Either<CoreFailure, ConversationId> =
Expand Down Expand Up @@ -87,24 +101,46 @@ internal class OneOnOneMigratorImpl(
userId = user.id
).map {
mlsConversation
}.also {
systemMessageInserter.insertProtocolChangedSystemMessage(
conversationId = mlsConversation,
senderUserId = user.id,
protocol = Conversation.Protocol.MLS
)
}
}
}
}

override suspend fun migrateExistingProteus(user: OtherUser): Either<CoreFailure, ConversationId> =
conversationRepository.getOneOnOneConversationsWithOtherUser(user.id, Conversation.Protocol.PROTEUS).flatMap { conversationIds ->
if (conversationIds.isNotEmpty()) {
val conversationId = conversationIds.first()
Either.Right(conversationId)
} else {
Either.Left(StorageFailure.DataNotFound)
}
}.flatMap { conversationId ->
if (user.activeOneOnOneConversationId != conversationId) {
kaliumLogger.d("resolved existing one-on-one to proteus, user = ${user.id.toLogString()}")
userRepository.updateActiveOneOnOneConversation(user.id, conversationId)
}
Either.Right(conversationId)
}

private suspend fun migrateOneOnOneHistory(user: OtherUser, targetConversation: ConversationId): Either<CoreFailure, Unit> {
return conversationRepository.getOneOnOneConversationsWithOtherUser(
otherUserId = user.id,
protocol = Conversation.Protocol.PROTEUS
).flatMap { proteusOneOnOneConversations ->
// We can theoretically have more than one proteus 1-1 conversation with
// team members since there was no backend safeguards against this
proteusOneOnOneConversations.foldToEitherWhileRight(Unit) { proteusOneOnOneConversation, _ ->
messageRepository.moveMessagesToAnotherConversation(
originalConversation = proteusOneOnOneConversation,
targetConversation = targetConversation
)
}
return conversationRepository.getOneOnOneConversationsWithOtherUser(
otherUserId = user.id,
protocol = Conversation.Protocol.PROTEUS
).flatMap { proteusOneOnOneConversations ->
// We can theoretically have more than one proteus 1-1 conversation with
// team members since there was no backend safeguards against this
proteusOneOnOneConversations.foldToEitherWhileRight(Unit) { proteusOneOnOneConversation, _ ->
messageRepository.moveMessagesToAnotherConversation(
originalConversation = proteusOneOnOneConversation,
targetConversation = targetConversation
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import com.wire.kalium.logic.feature.protocol.OneOnOneProtocolSelector
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.flatMapLeft
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.left
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.util.KaliumDispatcher
Expand Down Expand Up @@ -154,11 +156,18 @@ internal class OneOnOneResolverImpl(
if (invalidateCurrentKnownProtocols) {
userRepository.fetchUsersByIds(setOf(user.id))
}
return oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
return oneOnOneProtocolSelector.getProtocolForUser(user.id).fold({ coreFailure ->
if (coreFailure is CoreFailure.NoCommonProtocolFound.OtherNeedToUpdate) {
kaliumLogger.i("Resolving existing proteus 1:1 as not matching protocol found with: ${user.id.toLogString()}")
oneOnOneMigrator.migrateExistingProteus(user)
} else {
coreFailure.left()
}
}, { supportedProtocol ->
when (supportedProtocol) {
SupportedProtocol.PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
SupportedProtocol.MLS -> oneOnOneMigrator.migrateToMLS(user)
}
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.getOrNull
import com.wire.kalium.logic.kaliumLogger

internal interface OneOnOneProtocolSelector {
suspend fun getProtocolForUser(userId: UserId): Either<CoreFailure, SupportedProtocol>
Expand All @@ -41,13 +42,17 @@ internal class OneOnOneProtocolSelectorImpl(
return@flatMap Either.Left(CoreFailure.Unknown(error))
}

val teamDefaultProtocol = userConfigRepository.getDefaultProtocol().getOrNull()
val selfUserProtocols = selfUser.supportedProtocols.orEmpty()
val otherUserProtocols = otherUser.supportedProtocols.orEmpty()
val commonProtocols = userConfigRepository.getDefaultProtocol().fold({
selfUserProtocols.intersect(otherUserProtocols)
}, {
selfUserProtocols.intersect(listOf(it).toSet()).intersect(otherUserProtocols)
})
val commonProtocols = selfUserProtocols.intersect(otherUserProtocols)

kaliumLogger.withTextTag(TAG).d(
"teamDefaultProtocol = $teamDefaultProtocol, " +
"selfUserProtocols = $selfUserProtocols, " +
"otherUserProtocols = $otherUserProtocols, " +
"commonProtocols = $commonProtocols"
)

return when {
commonProtocols.contains(SupportedProtocol.MLS) -> Either.Right(SupportedProtocol.MLS)
Expand All @@ -56,4 +61,8 @@ internal class OneOnOneProtocolSelectorImpl(
else -> Either.Left(CoreFailure.NoCommonProtocolFound.SelfNeedToUpdate)
}
}

private companion object {
const val TAG = "OneOnOneProtocolSelector"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ class OneOnOneMigratorTest {
.suspendFunction(arrangement.messageRepository::moveMessagesToAnotherConversation)
.with(any(), any())
.wasNotInvoked()

verify(arrangement.systemMessageInserter)
.suspendFunction(arrangement.systemMessageInserter::insertProtocolChangedSystemMessage)
.with(any(), any(), any())
.wasNotInvoked()
}

@Test
Expand All @@ -142,7 +147,7 @@ class OneOnOneMigratorTest {
)
val failure = CoreFailure.MissingClientRegistration

val (_, oneOnOneMigrator) = arrange {
val (arrangement, oneOnOneMigrator) = arrange {
withResolveConversationReturning(Either.Left(failure))
}

Expand Down Expand Up @@ -173,6 +178,11 @@ class OneOnOneMigratorTest {
.suspendFunction(arrangement.userRepository::updateActiveOneOnOneConversation)
.with(any(), any())
.wasNotInvoked()

verify(arrangement.systemMessageInserter)
.suspendFunction(arrangement.systemMessageInserter::insertProtocolChangedSystemMessage)
.with(any(), any(), any())
.wasNotInvoked()
}

@Test
Expand Down Expand Up @@ -215,6 +225,11 @@ class OneOnOneMigratorTest {
.suspendFunction(arrangement.messageRepository::moveMessagesToAnotherConversation)
.with(eq(originalConversationId), eq(resolvedConversationId))
.wasInvoked(exactly = once)

verify(arrangement.systemMessageInserter)
.suspendFunction(arrangement.systemMessageInserter::insertProtocolChangedSystemMessage)
.with(any(), any(), any())
.wasInvoked(exactly = once)
}

@Test
Expand All @@ -238,28 +253,58 @@ class OneOnOneMigratorTest {
.suspendFunction(arrangement.userRepository::updateActiveOneOnOneConversation)
.with(eq(user.id), eq(resolvedConversationId))
.wasInvoked(exactly = once)

verify(arrangement.systemMessageInserter)
.suspendFunction(arrangement.systemMessageInserter::insertProtocolChangedSystemMessage)
.with(any(), any(), any())
.wasInvoked(exactly = once)
}

@Test
fun givenExistingTeamOneOnOne_whenMigratingToProteus_thenShouldNOTCreateGroupConversation() = runTest {
val user = TestUser.OTHER.copy(
activeOneOnOneConversationId = null
)

val (arrangement, oneOneMigrator) = arrange {
withGetOneOnOneConversationsWithOtherUserReturning(Either.Right(listOf(TestConversation.ONE_ON_ONE().id)))
withUpdateOneOnOneConversationReturning(Either.Right(Unit))
}

oneOneMigrator.migrateExistingProteus(user)
.shouldSucceed()

verify(arrangement.conversationGroupRepository)
.suspendFunction(arrangement.conversationGroupRepository::createGroupConversation)
.with(eq(null), eq(listOf(TestUser.OTHER.id)), eq(ConversationOptions()))
.wasNotInvoked()

verify(arrangement.userRepository)
.suspendFunction(arrangement.userRepository::updateActiveOneOnOneConversation)
.with(eq(TestUser.OTHER.id), eq(TestConversation.ONE_ON_ONE().id))
.wasInvoked(exactly = once)
}

private class Arrangement(private val block: Arrangement.() -> Unit) :
private class Arrangement(private val block: suspend Arrangement.() -> Unit) :
MLSOneOnOneConversationResolverArrangement by MLSOneOnOneConversationResolverArrangementImpl(),
MessageRepositoryArrangement by MessageRepositoryArrangementImpl(),
ConversationRepositoryArrangement by ConversationRepositoryArrangementImpl(),
ConversationGroupRepositoryArrangement by ConversationGroupRepositoryArrangementImpl(),
UserRepositoryArrangement by UserRepositoryArrangementImpl()
{
fun arrange() = run {
UserRepositoryArrangement by UserRepositoryArrangementImpl() {
suspend fun arrange() = run {
block()
this@Arrangement to OneOnOneMigratorImpl(
getResolvedMLSOneOnOne = mlsOneOnOneConversationResolver,
conversationGroupRepository = conversationGroupRepository,
conversationRepository = conversationRepository,
messageRepository = messageRepository,
userRepository = userRepository
userRepository = userRepository,
systemMessageInserter = systemMessageInserter
)
}
}

private companion object {
fun arrange(configuration: Arrangement.() -> Unit) = Arrangement(configuration).arrange()
suspend fun arrange(configuration: Arrangement.() -> Unit) = Arrangement(configuration).arrange()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import com.wire.kalium.logic.data.user.SupportedProtocol
import com.wire.kalium.logic.framework.TestConversation
import com.wire.kalium.logic.framework.TestUser
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.left
import com.wire.kalium.logic.functional.right
import com.wire.kalium.logic.util.arrangement.IncrementalSyncRepositoryArrangement
import com.wire.kalium.logic.util.arrangement.IncrementalSyncRepositoryArrangementImpl
import com.wire.kalium.logic.util.arrangement.mls.OneOnOneMigratorArrangement
Expand Down Expand Up @@ -255,6 +257,24 @@ class OneOnOneResolverTest {
.wasInvoked(exactly = once)
}

@Test
fun givenProtocolResolvesToOtherNeedToUpdate_whenResolveOneOnOneConversationWithUser_thenMigrateExistingToProteus() = runTest {
// given
val (arrangement, resolver) = arrange {
withGetProtocolForUser(CoreFailure.NoCommonProtocolFound.OtherNeedToUpdate.left())
withMigrateExistingToProteusReturns(TestConversation.ID.right())
}

// when
resolver.resolveOneOnOneConversationWithUser(OTHER_USER, false).shouldSucceed()

// then
verify(arrangement.oneOnOneMigrator)
.suspendFunction(arrangement.oneOnOneMigrator::migrateExistingProteus)
.with(eq(OTHER_USER))
.wasInvoked(exactly = once)
}

private class Arrangement(private val block: Arrangement.() -> Unit) :
UserRepositoryArrangement by UserRepositoryArrangementImpl(),
OneOnOneProtocolSelectorArrangement by OneOnOneProtocolSelectorArrangementImpl(),
Expand Down
Loading

0 comments on commit f9fcff1

Please sign in to comment.