From 9f3cf5c4264bd99b2f493a665e4b80384a1fd31f Mon Sep 17 00:00:00 2001 From: "sergei.bakhtiarov" Date: Tue, 14 Jan 2025 16:46:34 +0100 Subject: [PATCH] fix: refactoring self user fetch [#WPB-15190] --- .../wire/kalium/presentation/MainActivity.kt | 2 +- .../cli/commands/GenerateEventsCommand.kt | 2 +- .../kalium/logic/data/user/UserRepository.kt | 114 +++++++++++------- .../logic/feature/user/GetSelfUserUseCase.kt | 10 +- .../feature/user/ObserveSelfUserUseCase.kt | 48 ++++++++ .../kalium/logic/feature/user/UserScope.kt | 1 + .../wire/kalium/persistence/dao/UserDAO.kt | 2 +- .../kalium/persistence/dao/UserDAOImpl.kt | 7 ++ .../kotlin/PocIntegrationTest.kt | 2 +- 9 files changed, 137 insertions(+), 51 deletions(-) create mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/ObserveSelfUserUseCase.kt diff --git a/android/src/main/kotlin/com/wire/kalium/presentation/MainActivity.kt b/android/src/main/kotlin/com/wire/kalium/presentation/MainActivity.kt index b2be31d30b2..475747033d9 100644 --- a/android/src/main/kotlin/com/wire/kalium/presentation/MainActivity.kt +++ b/android/src/main/kotlin/com/wire/kalium/presentation/MainActivity.kt @@ -83,7 +83,7 @@ class MainActivity : ComponentActivity() { session.users.uploadUserAvatar(tempAvatarPath, imageContent.size.toLong()) - val selfUser = session.users.getSelfUser().first() + val selfUser = session.users.observeSelfUser().first() val avatarAsset = when (val publicAsset = session.users.getPublicAsset(selfUser.previewPicture!!)) { is PublicAssetResult.Success -> { diff --git a/cli/src/commonMain/kotlin/com/wire/kalium/cli/commands/GenerateEventsCommand.kt b/cli/src/commonMain/kotlin/com/wire/kalium/cli/commands/GenerateEventsCommand.kt index c3adb8a5672..4f5052e47fd 100644 --- a/cli/src/commonMain/kotlin/com/wire/kalium/cli/commands/GenerateEventsCommand.kt +++ b/cli/src/commonMain/kotlin/com/wire/kalium/cli/commands/GenerateEventsCommand.kt @@ -58,7 +58,7 @@ class GenerateEventsCommand : CliktCommand(name = "generate-events") { } override fun run() = runBlocking { - val selfUserId = userSession.users.getSelfUser().first().id + val selfUserId = userSession.users.observeSelfUser().first().id val selfClientId = userSession.clientIdProvider().getOrFail { throw PrintMessage("No self client is registered") } val targetUserId = UserId(value = targetUserId, domain = selfUserId.domain) val targetClientId = ClientId(targetClientId) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/user/UserRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/user/UserRepository.kt index 51f1549d955..8b383d2b203 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/user/UserRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/user/UserRepository.kt @@ -81,13 +81,17 @@ import com.wire.kalium.persistence.dao.UserIDEntity import com.wire.kalium.persistence.dao.UserTypeEntity import com.wire.kalium.persistence.dao.client.ClientDAO import com.wire.kalium.util.DateTimeUtil +import kotlinx.coroutines.Deferred import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.filterNotNull -import kotlinx.coroutines.flow.firstOrNull -import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Instant import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json @@ -200,6 +204,9 @@ internal class UserDataSource internal constructor( */ private val userDetailsRefreshInstantCache = ConcurrentMutableMap() + private var userRefresh: Deferred>? = null + private val userRefreshMutex: Mutex = Mutex() + override suspend fun fetchSelfUser(): Either = wrapApiRequest { selfApi.getSelfInfo() } .flatMap { selfUserDTO -> selfUserDTO.teamId.let { selfUserTeamId -> @@ -231,7 +238,6 @@ internal class UserDataSource internal constructor( private suspend fun updateSelfUserProviderAccountInfo(userDTO: SelfUserDTO): Either = sessionRepository.updateSsoIdAndScimInfo(userDTO.id.toModel(), idMapper.toSsoId(userDTO.ssoID), userDTO.managedByDTO) - // TODO: race condition, if we request the same user (can happen for self) multiple times, we will fetch it multiple times override suspend fun getKnownUser(userId: UserId): Flow = userDAO.observeUserDetailsByQualifiedID(qualifiedID = userId.toDao()) .map { userEntity -> @@ -247,24 +253,35 @@ internal class UserDataSource internal constructor( .map(userMapper::fromUserEntityToOtherUser) } + private suspend fun refreshUserDetailsIfNeeded(userId: UserId): Either = coroutineScope { + userRefreshMutex.withLock { + val now = DateTimeUtil.currentInstant() + val wasFetchedRecently = userDetailsRefreshInstantCache[userId]?.let { now < it + USER_DETAILS_MAX_AGE } ?: false + + if (wasFetchedRecently) return@coroutineScope Either.Right(Unit) + + if (userRefresh?.isActive == true) return@coroutineScope userRefresh?.await() ?: error("") + + userRefresh = async { refreshUserDetails(userId) } + } + + userRefresh?.await() ?: error("") + } + /** * Only refresh user profiles if it wasn't fetched recently. * * @see userDetailsRefreshInstantCache * @see USER_DETAILS_MAX_AGE */ - private suspend fun refreshUserDetailsIfNeeded(userId: UserId): Either { - val now = DateTimeUtil.currentInstant() - val wasFetchedRecently = userDetailsRefreshInstantCache[userId]?.let { now < it + USER_DETAILS_MAX_AGE } ?: false - return if (!wasFetchedRecently) { - when (userId) { - selfUserId -> fetchSelfUser() - else -> fetchUserInfo(userId) - }.also { - kaliumLogger.d("Refreshing user info from API after $USER_DETAILS_MAX_AGE") - userDetailsRefreshInstantCache[userId] = now - } - } else Either.Right(Unit) + private suspend fun refreshUserDetails(userId: UserId): Either { + return when (userId) { + selfUserId -> fetchSelfUser() + else -> fetchUserInfo(userId) + }.also { + kaliumLogger.d("Refreshing user info from API after $USER_DETAILS_MAX_AGE") + userDetailsRefreshInstantCache[userId] = DateTimeUtil.currentInstant() + } } override suspend fun fetchAllOtherUsers(): Either { @@ -417,40 +434,52 @@ internal class UserDataSource internal constructor( else fetchUsersByIds(missingIds.map { it.toModel() }.toSet()).map { } } - // TODO: this can cause many issues since it will - @OptIn(ExperimentalCoroutinesApi::class) override suspend fun observeSelfUser(): Flow { - return metadataDAO.valueByKeyFlow(SELF_USER_ID_KEY).onEach { - // If the self user is not in the database, proactively fetch it. - if (it == null) { - val logPrefix = "Observing self user before insertion" - kaliumLogger.w("$logPrefix: Triggering a fetch.") - fetchSelfUser().fold({ failure -> - kaliumLogger.e("""$logPrefix failed: {"failure":"$failure"}""") - }, { - kaliumLogger.i("$logPrefix: Succeeded") - userDetailsRefreshInstantCache[selfUserId] = DateTimeUtil.currentInstant() - }) - } else { - refreshUserDetailsIfNeeded(selfUserId) - } - }.filterNotNull().flatMapMerge { encodedValue -> - val selfUserID: QualifiedIDEntity = Json.decodeFromString(encodedValue) - userDAO.observeUserDetailsByQualifiedID(selfUserID) - .filterNotNull() + + val selfUser = getOrFetchSelfUserId() + + return if (selfUser != null) { + userDAO.observeUserDetailsByQualifiedID(selfUser).filterNotNull() .map(userMapper::fromUserDetailsEntityToSelfUser) + } else { + emptyFlow() } } + private suspend fun getOrFetchSelfUserId(): QualifiedIDEntity? { + + var userId = metadataDAO.valueByKey(SELF_USER_ID_KEY) + + if (userId == null) { + + val logPrefix = "Observing self user before insertion" + kaliumLogger.w("$logPrefix: Triggering a fetch.") + + fetchSelfUser().fold({ failure -> + kaliumLogger.e("""$logPrefix failed: {"failure":"$failure"}""") + }, { + kaliumLogger.i("$logPrefix: Succeeded") + userDetailsRefreshInstantCache[selfUserId] = DateTimeUtil.currentInstant() + }) + } else { + refreshUserDetailsIfNeeded(selfUserId) + } + + return metadataDAO.valueByKey(SELF_USER_ID_KEY)?.let { Json.decodeFromString(it) } + } + @OptIn(ExperimentalCoroutinesApi::class) override suspend fun observeSelfUserWithTeam(): Flow> { - return metadataDAO.valueByKeyFlow(SELF_USER_ID_KEY).filterNotNull().flatMapMerge { encodedValue -> - val selfUserID: QualifiedIDEntity = Json.decodeFromString(encodedValue) - userDAO.getUserDetailsWithTeamByQualifiedID(selfUserID) - .filterNotNull() + + val selfUser = getOrFetchSelfUserId() + + return if (selfUser != null) { + userDAO.getUserDetailsWithTeamByQualifiedID(selfUser).filterNotNull() .map { (user, team) -> userMapper.fromUserDetailsEntityToSelfUser(user) to team?.let { teamMapper.fromDaoModelToTeam(it) } } + } else { + emptyFlow() } } @@ -470,9 +499,12 @@ internal class UserDataSource internal constructor( }.map { } } - // TODO: replace the flow with selfUser and cache it override suspend fun getSelfUser(): SelfUser? { - return observeSelfUser().firstOrNull() + return getOrFetchSelfUserId()?.let { + userDAO.getUserDetailsByQualifiedID(it)?.let { + userMapper.fromUserDetailsEntityToSelfUser(it) + } + } } override suspend fun observeAllKnownUsers(): Flow>> { diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/GetSelfUserUseCase.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/GetSelfUserUseCase.kt index 4f1b36d9d81..e20aab97481 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/GetSelfUserUseCase.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/GetSelfUserUseCase.kt @@ -22,19 +22,17 @@ import com.wire.kalium.logic.data.user.SelfUser import com.wire.kalium.logic.data.user.UserRepository import com.wire.kalium.util.KaliumDispatcher import com.wire.kalium.util.KaliumDispatcherImpl -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.withContext /** * This use case is responsible for retrieving the current user. - * fixme: Rename to ObserveSelfUserUseCase */ interface GetSelfUserUseCase { /** - * @return a [Flow] of the current user [SelfUser] + * @return current user [SelfUser] */ - suspend operator fun invoke(): Flow + suspend operator fun invoke(): SelfUser? } @@ -43,7 +41,7 @@ internal class GetSelfUserUseCaseImpl internal constructor( private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl ) : GetSelfUserUseCase { - override suspend operator fun invoke(): Flow = withContext(dispatcher.io) { - userRepository.observeSelfUser() + override suspend operator fun invoke(): SelfUser? = withContext(dispatcher.io) { + userRepository.getSelfUser() } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/ObserveSelfUserUseCase.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/ObserveSelfUserUseCase.kt new file mode 100644 index 00000000000..e2e63f8f00e --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/ObserveSelfUserUseCase.kt @@ -0,0 +1,48 @@ +/* + * Wire + * Copyright (C) 2024 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ + +package com.wire.kalium.logic.feature.user + +import com.wire.kalium.logic.data.user.SelfUser +import com.wire.kalium.logic.data.user.UserRepository +import com.wire.kalium.util.KaliumDispatcher +import com.wire.kalium.util.KaliumDispatcherImpl +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.withContext + +/** + * This use case is responsible for observing the current user. + */ +interface ObserveSelfUserUseCase { + + /** + * @return a [Flow] of the current user [SelfUser] + */ + suspend operator fun invoke(): Flow + +} + +internal class ObserveSelfUserUseCaseImpl internal constructor( + private val userRepository: UserRepository, + private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl +) : ObserveSelfUserUseCase { + + override suspend operator fun invoke(): Flow = withContext(dispatcher.io) { + userRepository.observeSelfUser() + } +} diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/UserScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/UserScope.kt index 7b5552e66fc..49f84782c61 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/UserScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/UserScope.kt @@ -123,6 +123,7 @@ class UserScope internal constructor( ) { private val validateUserHandleUseCase: ValidateUserHandleUseCase get() = ValidateUserHandleUseCaseImpl() val getSelfUser: GetSelfUserUseCase get() = GetSelfUserUseCaseImpl(userRepository) + val observeSelfUser: ObserveSelfUserUseCase get() = ObserveSelfUserUseCaseImpl(userRepository) val getSelfUserWithTeam: ObserveSelfUserWithTeamUseCase get() = ObserveSelfUserWithTeamUseCaseImpl(userRepository) val observeUserInfo: ObserveUserInfoUseCase get() = ObserveUserInfoUseCaseImpl(userRepository, teamRepository) val uploadUserAvatar: UploadUserAvatarUseCase get() = UploadUserAvatarUseCaseImpl(userRepository, assetRepository) diff --git a/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAO.kt b/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAO.kt index c92b50621e0..858c7428d8a 100644 --- a/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAO.kt +++ b/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAO.kt @@ -20,7 +20,6 @@ package com.wire.kalium.persistence.dao import com.wire.kalium.logger.obfuscateDomain import com.wire.kalium.logger.obfuscateId -import com.wire.kalium.persistence.dao.ManagedByEntity.WIRE import com.wire.kalium.persistence.dao.conversation.NameAndHandleEntity import kotlinx.coroutines.flow.Flow import kotlinx.datetime.Instant @@ -258,6 +257,7 @@ interface UserDAO { suspend fun observeUserDetailsByQualifiedID(qualifiedID: QualifiedIDEntity): Flow suspend fun getUserDetailsWithTeamByQualifiedID(qualifiedID: QualifiedIDEntity): Flow?> suspend fun getUserMinimizedByQualifiedID(qualifiedID: QualifiedIDEntity): UserEntityMinimized? + suspend fun getUserDetailsByQualifiedID(qualifiedID: QualifiedIDEntity): UserDetailsEntity? suspend fun getUsersDetailsByQualifiedIDList(qualifiedIDList: List): List suspend fun getUserDetailsByNameOrHandleOrEmailAndConnectionStates( searchQuery: String, diff --git a/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAOImpl.kt b/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAOImpl.kt index f601dcc701c..e14128e0d6f 100644 --- a/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAOImpl.kt +++ b/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAOImpl.kt @@ -296,6 +296,13 @@ class UserDAOImpl internal constructor( }.executeAsList() } + override suspend fun getUserDetailsByQualifiedID(qualifiedID: QualifiedIDEntity): UserDetailsEntity? = + withContext(queriesContext) { + userQueries.selectDetailsByQualifiedId(listOf(qualifiedID)) + .executeAsOneOrNull() + ?.let { mapper.toDetailsModel(it) } + } + override suspend fun getUsersDetailsByQualifiedIDList(qualifiedIDList: List): List = withContext(queriesContext) { userQueries.selectDetailsByQualifiedId(qualifiedIDList) diff --git a/tango-tests/src/integrationTest/kotlin/PocIntegrationTest.kt b/tango-tests/src/integrationTest/kotlin/PocIntegrationTest.kt index 270ea80e43e..d62e582cb27 100644 --- a/tango-tests/src/integrationTest/kotlin/PocIntegrationTest.kt +++ b/tango-tests/src/integrationTest/kotlin/PocIntegrationTest.kt @@ -139,7 +139,7 @@ class PocIntegrationTest { launch { val userSession = initUserSession(createCoreLogic(mockedRequests)) - val selfUserId = userSession.users.getSelfUser().first().id + val selfUserId = userSession.users.observeSelfUser().first().id val selfClientId = userSession.clientIdProvider().getOrFail { throw IllegalStateException("No self client is registered") } val targetUserId = UserId(value = selfUserId.value, domain = selfUserId.domain)