Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Clear conversation content on all devices WPB-14938 #3235

Merged
merged 12 commits into from
Jan 23, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import com.wire.kalium.network.api.authenticated.conversation.model.Conversation
import com.wire.kalium.network.api.authenticated.conversation.model.ConversationReceiptModeDTO
import com.wire.kalium.network.api.base.authenticated.client.ClientApi
import com.wire.kalium.network.api.base.authenticated.conversation.ConversationApi
import com.wire.kalium.persistence.dao.MetadataDAO
import com.wire.kalium.persistence.dao.QualifiedIDEntity
import com.wire.kalium.persistence.dao.client.ClientDAO
import com.wire.kalium.persistence.dao.conversation.ConversationDAO
Expand All @@ -85,6 +86,7 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant
import kotlinx.serialization.builtins.SetSerializer

@Suppress("TooManyFunctions")
interface ConversationRepository {
Expand Down Expand Up @@ -224,7 +226,6 @@ interface ConversationRepository {
): Either<CoreFailure, Unit>

suspend fun deleteConversation(conversationId: ConversationId): Either<CoreFailure, Unit>
suspend fun deleteConversationLocally(conversationId: ConversationId): Either<CoreFailure, Unit>

/**
* Deletes all conversation messages
Expand Down Expand Up @@ -315,6 +316,9 @@ interface ConversationRepository {

suspend fun getGroupStatusMembersNamesAndHandles(groupID: GroupID): Either<StorageFailure, EpochChangesData>
suspend fun selectMembersNameAndHandle(conversationId: ConversationId): Either<StorageFailure, Map<UserId, NameAndHandle>>
suspend fun addConversationToDeleteQueue(conversationId: ConversationId)
suspend fun removeConversationFromDeleteQueue(conversationId: ConversationId)
suspend fun getConversationsDeleteQueue(): List<ConversationId>
}

@Suppress("LongParameterList", "TooManyFunctions", "LargeClass")
Expand All @@ -330,6 +334,7 @@ internal class ConversationDataSource internal constructor(
private val clientDAO: ClientDAO,
private val clientApi: ClientApi,
private val conversationMetaDataDAO: ConversationMetaDataDAO,
private val metadataDAO: MetadataDAO,
private val idMapper: IdMapper = MapperProvider.idMapper(),
private val conversationMapper: ConversationMapper = MapperProvider.conversationMapper(selfUserId),
private val memberMapper: MemberMapper = MapperProvider.memberMapper(),
Expand Down Expand Up @@ -884,12 +889,6 @@ internal class ConversationDataSource internal constructor(
}
}

override suspend fun deleteConversationLocally(conversationId: ConversationId): Either<CoreFailure, Unit> {
return wrapStorageRequest {
conversationDAO.deleteConversationByQualifiedID(conversationId.toDao())
}
}

override suspend fun clearContent(conversationId: ConversationId): Either<StorageFailure, Unit> =
wrapStorageRequest {
conversationDAO.clearContent(conversationId.toDao())
Expand Down Expand Up @@ -1146,7 +1145,38 @@ internal class ConversationDataSource internal constructor(
.mapKeys { it.key.toModel() }
}

override suspend fun addConversationToDeleteQueue(conversationId: ConversationId) {
val queue = metadataDAO.getSerializable(CONVERSATIONS_TO_DELETE_KEY, SetSerializer(QualifiedIDEntity.serializer()))
?.toMutableSet()
?.plus(conversationId.toDao())
?: setOf(conversationId.toDao())

metadataDAO.putSerializable(
CONVERSATIONS_TO_DELETE_KEY,
queue,
SetSerializer(QualifiedIDEntity.serializer())
)
}

override suspend fun removeConversationFromDeleteQueue(conversationId: ConversationId) {
val queue = metadataDAO.getSerializable(CONVERSATIONS_TO_DELETE_KEY, SetSerializer(QualifiedIDEntity.serializer()))
?.toMutableSet()
?.minus(conversationId.toDao())
?: return

metadataDAO.putSerializable(
CONVERSATIONS_TO_DELETE_KEY,
queue,
SetSerializer(QualifiedIDEntity.serializer())
)
}

override suspend fun getConversationsDeleteQueue(): List<ConversationId> =
metadataDAO.getSerializable(CONVERSATIONS_TO_DELETE_KEY, SetSerializer(QualifiedIDEntity.serializer()))
?.map { it.toModel() } ?: listOf()

companion object {
const val DEFAULT_MEMBER_ROLE = "wire_member"
private const val CONVERSATIONS_TO_DELETE_KEY = "conversations_to_delete"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ class UserSessionScope internal constructor(
userStorage.database.clientDAO,
authenticatedNetworkContainer.clientApi,
userStorage.database.conversationMetaDataDAO,
userStorage.database.metadataDAO,
)

private val conversationFolderRepository: ConversationFolderRepository
Expand Down Expand Up @@ -1381,6 +1382,7 @@ class UserSessionScope internal constructor(
conversationRepository,
userId,
isMessageSentInSelfConversation,
conversations.clearConversationAssetsLocally
),
DeleteForMeHandlerImpl(messageRepository, isMessageSentInSelfConversation),
DeleteMessageHandlerImpl(messageRepository, assetRepository, NotificationEventsManagerImpl, userId),
Expand Down Expand Up @@ -1441,10 +1443,12 @@ class UserSessionScope internal constructor(
get() = MemberLeaveEventHandlerImpl(
memberDAO = userStorage.database.memberDAO,
userRepository = userRepository,
conversationRepository = conversationRepository,
persistMessage = persistMessage,
updateConversationClientsForCurrentCall = updateConversationClientsForCurrentCall,
legalHoldHandler = legalHoldHandler,
selfTeamIdProvider = selfTeamId
selfTeamIdProvider = selfTeamId,
selfUserId = userId
)
private val memberChangeHandler: MemberChangeEventHandler
get() = MemberChangeEventHandlerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package com.wire.kalium.logic.feature.conversation

import com.benasher44.uuid.uuid4
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.cache.SelfConversationIdProvider
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
Expand All @@ -41,11 +42,11 @@ interface ClearConversationContentUseCase {
* @param conversationId The conversation id to clear all messages.
* @return [Result] of the operation, indicating success or failure.
*/
suspend operator fun invoke(conversationId: ConversationId): Result
suspend operator fun invoke(conversationId: ConversationId, needToRemoveConversation: Boolean = false): Result

sealed class Result {
data object Success : Result()
data object Failure : Result()
data class Failure(val failure: CoreFailure) : Result()
}
}

Expand All @@ -54,33 +55,38 @@ internal class ClearConversationContentUseCaseImpl(
private val messageSender: MessageSender,
private val selfUserId: UserId,
private val currentClientIdProvider: CurrentClientIdProvider,
private val selfConversationIdProvider: SelfConversationIdProvider
private val selfConversationIdProvider: SelfConversationIdProvider,
private val clearLocalConversationAssets: ClearConversationAssetsLocallyUseCase
) : ClearConversationContentUseCase {

override suspend fun invoke(conversationId: ConversationId): ClearConversationContentUseCase.Result =
conversationRepository.clearContent(conversationId).flatMap {
currentClientIdProvider().flatMap { currentClientId ->
selfConversationIdProvider().flatMap { selfConversationIds ->
selfConversationIds.foldToEitherWhileRight(Unit) { selfConversationId, _ ->
val regularMessage = Message.Signaling(
id = uuid4().toString(),
content = MessageContent.Cleared(
conversationId = conversationId,
time = DateTimeUtil.currentInstant(),
needToRemoveLocally = false // TODO Handle in upcoming tasks
),
// sending the message to clear this conversation
conversationId = selfConversationId,
date = Clock.System.now(),
senderUserId = selfUserId,
senderClientId = currentClientId,
status = Message.Status.Pending,
isSelfMessage = true,
expirationData = null
)
messageSender.sendMessage(regularMessage)
}
override suspend fun invoke(
conversationId: ConversationId,
needToRemoveConversation: Boolean
): ClearConversationContentUseCase.Result =
currentClientIdProvider().flatMap { currentClientId ->
selfConversationIdProvider().flatMap { selfConversationIds ->
selfConversationIds.foldToEitherWhileRight(Unit) { selfConversationId, _ ->
val regularMessage = Message.Signaling(
id = uuid4().toString(),
content = MessageContent.Cleared(
conversationId = conversationId,
time = DateTimeUtil.currentInstant(),
needToRemoveLocally = needToRemoveConversation
),
// sending the message to clear this conversation
conversationId = selfConversationId,
date = Clock.System.now(),
senderUserId = selfUserId,
senderClientId = currentClientId,
status = Message.Status.Pending,
isSelfMessage = true,
expirationData = null
)
messageSender.sendMessage(regularMessage)
}
}
}.fold({ ClearConversationContentUseCase.Result.Failure }, { ClearConversationContentUseCase.Result.Success })
}
.flatMap { conversationRepository.clearContent(conversationId) }
.flatMap { clearLocalConversationAssets(conversationId) }
.fold({ ClearConversationContentUseCase.Result.Failure(it) }, { ClearConversationContentUseCase.Result.Success })
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,25 +272,26 @@ class ConversationScope internal constructor(
val updateMLSGroupsKeyingMaterials: UpdateKeyingMaterialsUseCase
get() = UpdateKeyingMaterialsUseCaseImpl(mlsConversationRepository, updateKeyingMaterialThresholdProvider)

val clearConversationAssetsLocally: ClearConversationAssetsLocallyUseCase
get() = ClearConversationAssetsLocallyUseCaseImpl(
messageRepository,
assetRepository
)

val clearConversationContent: ClearConversationContentUseCase
get() = ClearConversationContentUseCaseImpl(
conversationRepository,
messageSender,
selfUserId,
currentClientIdProvider,
selfConversationIdProvider
)

val clearConversationAssetsLocally: ClearConversationAssetsLocallyUseCase
get() = ClearConversationAssetsLocallyUseCaseImpl(
messageRepository,
assetRepository
selfConversationIdProvider,
clearConversationAssetsLocally
)

val deleteConversationLocallyUseCase: DeleteConversationLocallyUseCase
get() = DeleteConversationLocallyUseCaseImpl(
conversationRepository,
clearConversationAssetsLocally
clearConversationContent,
conversationRepository
)

val joinConversationViaCode: JoinConversationViaCodeUseCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,32 @@ import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.left

interface DeleteConversationLocallyUseCase {
/**
* Delete local conversation which:
* - Clear all local assets
* - Clear content
* - Remove conversation
* - Send Signal message to other clients to do the same
*
* @param conversationId - id of conversation to delete
*/
suspend operator fun invoke(conversationId: ConversationId): Either<CoreFailure, Unit>
}

internal class DeleteConversationLocallyUseCaseImpl(
private val clearConversationContent: ClearConversationContentUseCase,
private val conversationRepository: ConversationRepository,
private val clearLocalConversationAssets: ClearConversationAssetsLocallyUseCase
) : DeleteConversationLocallyUseCase {

override suspend fun invoke(conversationId: ConversationId): Either<CoreFailure, Unit> {
return clearLocalConversationAssets(conversationId)
.flatMap { conversationRepository.clearContent(conversationId) }
.flatMap { conversationRepository.deleteConversationLocally(conversationId) }
val clearResult = clearConversationContent(conversationId, true)
return if (clearResult is ClearConversationContentUseCase.Result.Failure) {
clearResult.failure.left()
} else {
conversationRepository.deleteConversation(conversationId)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.wire.kalium.logic.sync.receiver.conversation

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.event.Event
import com.wire.kalium.logic.data.event.MemberLeaveReason
import com.wire.kalium.logic.data.id.ConversationId
Expand Down Expand Up @@ -46,26 +47,27 @@ interface MemberLeaveEventHandler {
suspend fun handle(event: Event.Conversation.MemberLeave): Either<CoreFailure, Unit>
}

@Suppress("LongParameterList")
internal class MemberLeaveEventHandlerImpl(
private val memberDAO: MemberDAO,
private val userRepository: UserRepository,
private val conversationRepository: ConversationRepository,
private val persistMessage: PersistMessageUseCase,
private val updateConversationClientsForCurrentCall: Lazy<UpdateConversationClientsForCurrentCallUseCase>,
private val legalHoldHandler: LegalHoldHandler,
private val selfTeamIdProvider: SelfTeamIdProvider
private val selfTeamIdProvider: SelfTeamIdProvider,
private val selfUserId: UserId,
) : MemberLeaveEventHandler {

override suspend fun handle(event: Event.Conversation.MemberLeave): Either<CoreFailure, Unit> {
val eventLogger = kaliumLogger.createEventProcessingLogger(event)
return let {
if (event.reason == MemberLeaveReason.UserDeleted) {
userRepository.markAsDeleted(event.removedList)
}
deleteMembers(event.removedList, event.conversationId)
if (event.reason == MemberLeaveReason.UserDeleted) {
userRepository.markAsDeleted(event.removedList)
}
return deleteMembers(event.removedList, event.conversationId)
.onSuccess { updateConversationClientsForCurrentCall.value(event.conversationId) }
.onSuccess { deleteConversationIfNeeded(event) }
.onSuccess {
updateConversationClientsForCurrentCall.value(event.conversationId)
}.onSuccess {
// fetch required unknown users that haven't been persisted during slow sync, e.g. from another team
// and keep them to properly show this member-leave message
userRepository.fetchUsersIfUnknownByIds(event.removedList.toSet())
Expand Down Expand Up @@ -131,4 +133,16 @@ internal class MemberLeaveEventHandlerImpl(
conversationID.toDao()
)
}

private suspend fun deleteConversationIfNeeded(event: Event.Conversation.MemberLeave) {
val isSelfUserLeftConversation = event.removedList == listOf(selfUserId) && event.reason == MemberLeaveReason.Left
if (!isSelfUserLeftConversation) return

if (!conversationRepository.getConversationsDeleteQueue().contains(event.conversationId)) return

// User wanted to delete conversation fully, but MessageContent.Cleared event came before and we couldn't delete it then.
// Now, when user left the conversation, we can delete it.
conversationRepository.deleteConversation(event.conversationId)
conversationRepository.removeConversationFromDeleteQueue(event.conversationId)
}
}
Loading
Loading