Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fallback to proteus if conversation already present but MLS is default (WPB-15191) #3200

Merged
merged 3 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ import kotlinx.coroutines.flow.first
/**
* Operation that creates one-to-one Conversation with specific [UserId] (only if it is absent in local DB)
* and returns [Conversation] data.
*
* @param otherUserId [UserId] private conversation with which we are interested in.
* @return Result with [Conversation] in case of success, or [CoreFailure] if something went wrong:
* can't get data from local DB, or can't create a conversation.
*/
interface GetOrCreateOneToOneConversationUseCase {
suspend operator fun invoke(otherUserId: UserId): CreateConversationResult
Expand All @@ -47,6 +43,14 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
private val userRepository: UserRepository,
private val oneOnOneResolver: OneOnOneResolver
) : GetOrCreateOneToOneConversationUseCase {

/**
* The use case operation operation params and return type.
*
* @param otherUserId [UserId] private conversation with which we are interested in.
* @return Result with [Conversation] in case of success, or [CoreFailure] if something went wrong:
* can't get data from local DB, or can't create a conversation.
*/
override suspend operator fun invoke(otherUserId: UserId): CreateConversationResult {
// TODO periodically re-resolve one-on-one
return conversationRepository.observeOneToOneConversationWithOtherUser(otherUserId)
Expand All @@ -66,6 +70,18 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
})
}

/**
* Resolves one-on-one conversation with the user.
* Resolving conversations is the process of:
*
* - Intersecting the supported protocols of the self user and the other user.
* - Selecting the common protocol, based on the team settings with the highest priority.
* - Get or create a conversation with the other user.
* - If the protocol now is MLS, migrate the existing Proteus conversation to MLS.
* - Mark the conversation as active.
*
* If no common protocol is found, and we have existing Proteus conversations, we do best effort to use them as fallback.
*/
private suspend fun resolveOneOnOneConversationWithUser(otherUserId: UserId): Either<CoreFailure, Conversation> =
userRepository.userById(otherUserId).flatMap { otherUser ->
// TODO support lazily establishing mls group for team 1-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,19 @@ import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger

interface OneOnOneMigrator {
/**
* Migrates the user's one-on-one Proteus. Without creating a new one since MLS is the default, marking it as active.
*/
suspend fun migrateExistingProteus(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Get one-on-one conversation with the user, if not found, create a new one (Proteus still default) and mark it as active.
*/
suspend fun migrateToProteus(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Perform migration of Proteus to MLS keeping history and marking the new conversation as active.
*/
suspend fun migrateToMLS(user: OtherUser): Either<CoreFailure, ConversationId>
}

Expand Down Expand Up @@ -100,19 +112,35 @@ internal class OneOnOneMigratorImpl(
}
}

override suspend fun migrateExistingProteus(user: OtherUser): Either<CoreFailure, ConversationId> =
conversationRepository.getOneOnOneConversationsWithOtherUser(user.id, Conversation.Protocol.PROTEUS).flatMap { conversationIds ->
if (conversationIds.isNotEmpty()) {
val conversationId = conversationIds.first()
Either.Right(conversationId)
} else {
Either.Left(StorageFailure.DataNotFound)
}
}.flatMap { conversationId ->
if (user.activeOneOnOneConversationId != conversationId) {
kaliumLogger.d("resolved existing one-on-one to proteus, user = ${user.id.toLogString()}")
userRepository.updateActiveOneOnOneConversation(user.id, conversationId)
}
Either.Right(conversationId)
}

private suspend fun migrateOneOnOneHistory(user: OtherUser, targetConversation: ConversationId): Either<CoreFailure, Unit> {
return conversationRepository.getOneOnOneConversationsWithOtherUser(
otherUserId = user.id,
protocol = Conversation.Protocol.PROTEUS
).flatMap { proteusOneOnOneConversations ->
// We can theoretically have more than one proteus 1-1 conversation with
// team members since there was no backend safeguards against this
proteusOneOnOneConversations.foldToEitherWhileRight(Unit) { proteusOneOnOneConversation, _ ->
messageRepository.moveMessagesToAnotherConversation(
originalConversation = proteusOneOnOneConversation,
targetConversation = targetConversation
)
}
return conversationRepository.getOneOnOneConversationsWithOtherUser(
otherUserId = user.id,
protocol = Conversation.Protocol.PROTEUS
).flatMap { proteusOneOnOneConversations ->
// We can theoretically have more than one proteus 1-1 conversation with
// team members since there was no backend safeguards against this
proteusOneOnOneConversations.foldToEitherWhileRight(Unit) { proteusOneOnOneConversation, _ ->
messageRepository.moveMessagesToAnotherConversation(
originalConversation = proteusOneOnOneConversation,
targetConversation = targetConversation
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import com.wire.kalium.logic.feature.protocol.OneOnOneProtocolSelector
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.flatMapLeft
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.left
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.util.KaliumDispatcher
Expand Down Expand Up @@ -154,11 +156,18 @@ internal class OneOnOneResolverImpl(
if (invalidateCurrentKnownProtocols) {
userRepository.fetchUsersByIds(setOf(user.id))
}
return oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
return oneOnOneProtocolSelector.getProtocolForUser(user.id).fold({ coreFailure ->
if (coreFailure is CoreFailure.NoCommonProtocolFound.OtherNeedToUpdate) {
kaliumLogger.i("Resolving existing proteus 1:1 as not matching protocol found with: ${user.id.toLogString()}")
oneOnOneMigrator.migrateExistingProteus(user)
} else {
coreFailure.left()
}
}, { supportedProtocol ->
when (supportedProtocol) {
SupportedProtocol.PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
SupportedProtocol.MLS -> oneOnOneMigrator.migrateToMLS(user)
}
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,6 @@ class OneOnOneMigratorTest {
}.wasNotInvoked()
}

@Test
fun givenUnassignedOneOnOne_whenMigratingToProteus_thenShouldAssignOneOnOneConversation() = runTest {
val user = TestUser.OTHER.copy(
activeOneOnOneConversationId = null
)

val (arrangement, oneOneMigrator) = arrange {
withGetOneOnOneConversationsWithOtherUserReturning(Either.Right(listOf(TestConversation.ID)))
withUpdateOneOnOneConversationReturning(Either.Right(Unit))
}

oneOneMigrator.migrateToProteus(user)
.shouldSucceed()

coVerify {
arrangement.userRepository.updateActiveOneOnOneConversation(eq(user.id), eq(TestConversation.ID))
}.wasInvoked()
}

@Test
fun givenNoExistingTeamOneOnOne_whenMigratingToProteus_thenShouldCreateGroupConversation() = runTest {
val user = TestUser.OTHER.copy(
Expand Down Expand Up @@ -252,6 +233,33 @@ class OneOnOneMigratorTest {
}.wasInvoked(exactly = once)
}

@Test
fun givenExistingTeamOneOnOne_whenMigratingToProteus_thenShouldNOTCreateGroupConversation() = runTest {
val user = TestUser.OTHER.copy(
activeOneOnOneConversationId = null
)

val (arrangement, oneOneMigrator) = arrange {
withGetOneOnOneConversationsWithOtherUserReturning(Either.Right(listOf(TestConversation.ONE_ON_ONE().id)))
withUpdateOneOnOneConversationReturning(Either.Right(Unit))
}

oneOneMigrator.migrateExistingProteus(user)
.shouldSucceed()

coVerify {
arrangement.conversationGroupRepository.createGroupConversation(
name = eq<String?>(null),
usersList = eq(listOf(TestUser.OTHER.id)),
options = eq(ConversationOptions())
)
}.wasNotInvoked()

coVerify {
arrangement.userRepository.updateActiveOneOnOneConversation(eq(TestUser.OTHER.id), eq(TestConversation.ONE_ON_ONE().id))
}.wasInvoked()
}

private class Arrangement(private val block: suspend Arrangement.() -> Unit) :
MLSOneOnOneConversationResolverArrangement by MLSOneOnOneConversationResolverArrangementImpl(),
MessageRepositoryArrangement by MessageRepositoryArrangementImpl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import com.wire.kalium.logic.data.user.SupportedProtocol
import com.wire.kalium.logic.framework.TestConversation
import com.wire.kalium.logic.framework.TestUser
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.left
import com.wire.kalium.logic.functional.right
import com.wire.kalium.logic.util.arrangement.IncrementalSyncRepositoryArrangement
import com.wire.kalium.logic.util.arrangement.IncrementalSyncRepositoryArrangementImpl
import com.wire.kalium.logic.util.arrangement.mls.OneOnOneMigratorArrangement
Expand Down Expand Up @@ -244,6 +246,23 @@ class OneOnOneResolverTest {
}.wasInvoked(exactly = once)
}

@Test
fun givenProtocolResolvesToOtherNeedToUpdate_whenResolveOneOnOneConversationWithUser_thenMigrateExistingToProteus() = runTest {
// given
val (arrangement, resolver) = arrange {
withGetProtocolForUser(CoreFailure.NoCommonProtocolFound.OtherNeedToUpdate.left())
withMigrateExistingToProteusReturns(TestConversation.ID.right())
}

// when
resolver.resolveOneOnOneConversationWithUser(OTHER_USER, false).shouldSucceed()

// then
coVerify {
arrangement.oneOnOneMigrator.migrateExistingProteus(eq(OTHER_USER))
}.wasInvoked(exactly = once)
}

private class Arrangement(private val block: suspend Arrangement.() -> Unit) :
UserRepositoryArrangement by UserRepositoryArrangementImpl(),
OneOnOneProtocolSelectorArrangement by OneOnOneProtocolSelectorArrangementImpl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ interface OneOnOneMigratorArrangement {
suspend fun withMigrateToMLSReturns(result: Either<CoreFailure, ConversationId>)

suspend fun withMigrateToProteusReturns(result: Either<CoreFailure, ConversationId>)

suspend fun withMigrateExistingToProteusReturns(result: Either<CoreFailure, ConversationId>)
}

class OneOnOneMigratorArrangementImpl : OneOnOneMigratorArrangement {
Expand All @@ -51,4 +53,10 @@ class OneOnOneMigratorArrangementImpl : OneOnOneMigratorArrangement {
oneOnOneMigrator.migrateToProteus(any())
}.returns(result)
}

override suspend fun withMigrateExistingToProteusReturns(result: Either<CoreFailure, ConversationId>) {
coEvery {
oneOnOneMigrator.migrateExistingProteus(any())
}.returns(result)
}
}
Loading