Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactoring self user fetch [#WPB-15190] #3229

Merged
merged 3 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class MainActivity : ComponentActivity() {

session.users.uploadUserAvatar(tempAvatarPath, imageContent.size.toLong())

val selfUser = session.users.getSelfUser().first()
val selfUser = session.users.observeSelfUser().first()

val avatarAsset = when (val publicAsset = session.users.getPublicAsset(selfUser.previewPicture!!)) {
is PublicAssetResult.Success -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class GenerateEventsCommand : CliktCommand(name = "generate-events") {
}

override fun run() = runBlocking {
val selfUserId = userSession.users.getSelfUser().first().id
val selfUserId = userSession.users.observeSelfUser().first().id
val selfClientId = userSession.clientIdProvider().getOrFail { throw PrintMessage("No self client is registered") }
val targetUserId = UserId(value = targetUserId, domain = selfUserId.domain)
val targetClientId = ClientId(targetClientId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,17 @@ import com.wire.kalium.persistence.dao.UserIDEntity
import com.wire.kalium.persistence.dao.UserTypeEntity
import com.wire.kalium.persistence.dao.client.ClientDAO
import com.wire.kalium.util.DateTimeUtil
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
Expand Down Expand Up @@ -200,6 +204,9 @@ internal class UserDataSource internal constructor(
*/
private val userDetailsRefreshInstantCache = ConcurrentMutableMap<UserId, Instant>()

private var userRefresh: Deferred<Either<CoreFailure, Unit>>? = null
private val userRefreshMutex: Mutex = Mutex()

override suspend fun fetchSelfUser(): Either<CoreFailure, Unit> = wrapApiRequest { selfApi.getSelfInfo() }
.flatMap { selfUserDTO ->
selfUserDTO.teamId.let { selfUserTeamId ->
Expand Down Expand Up @@ -231,7 +238,6 @@ internal class UserDataSource internal constructor(
private suspend fun updateSelfUserProviderAccountInfo(userDTO: SelfUserDTO): Either<StorageFailure, Unit> =
sessionRepository.updateSsoIdAndScimInfo(userDTO.id.toModel(), idMapper.toSsoId(userDTO.ssoID), userDTO.managedByDTO)

// TODO: race condition, if we request the same user (can happen for self) multiple times, we will fetch it multiple times
override suspend fun getKnownUser(userId: UserId): Flow<OtherUser?> =
userDAO.observeUserDetailsByQualifiedID(qualifiedID = userId.toDao())
.map { userEntity ->
Expand All @@ -247,24 +253,35 @@ internal class UserDataSource internal constructor(
.map(userMapper::fromUserEntityToOtherUser)
}

private suspend fun refreshUserDetailsIfNeeded(userId: UserId): Either<CoreFailure, Unit> = coroutineScope {
userRefreshMutex.withLock {
val now = DateTimeUtil.currentInstant()
val wasFetchedRecently = userDetailsRefreshInstantCache[userId]?.let { now < it + USER_DETAILS_MAX_AGE } ?: false

if (wasFetchedRecently) return@coroutineScope Either.Right(Unit)

if (userRefresh?.isActive == true) return@coroutineScope userRefresh?.await() ?: error("")

userRefresh = async { refreshUserDetails(userId) }
}

userRefresh?.await() ?: error("")
}

/**
* Only refresh user profiles if it wasn't fetched recently.
*
* @see userDetailsRefreshInstantCache
* @see USER_DETAILS_MAX_AGE
*/
private suspend fun refreshUserDetailsIfNeeded(userId: UserId): Either<CoreFailure, Unit> {
val now = DateTimeUtil.currentInstant()
val wasFetchedRecently = userDetailsRefreshInstantCache[userId]?.let { now < it + USER_DETAILS_MAX_AGE } ?: false
return if (!wasFetchedRecently) {
when (userId) {
selfUserId -> fetchSelfUser()
else -> fetchUserInfo(userId)
}.also {
kaliumLogger.d("Refreshing user info from API after $USER_DETAILS_MAX_AGE")
userDetailsRefreshInstantCache[userId] = now
}
} else Either.Right(Unit)
private suspend fun refreshUserDetails(userId: UserId): Either<CoreFailure, Unit> {
return when (userId) {
selfUserId -> fetchSelfUser()
else -> fetchUserInfo(userId)
}.also {
kaliumLogger.d("Refreshing user info from API after $USER_DETAILS_MAX_AGE")
userDetailsRefreshInstantCache[userId] = DateTimeUtil.currentInstant()
}
}

override suspend fun fetchAllOtherUsers(): Either<CoreFailure, Unit> {
Expand Down Expand Up @@ -417,40 +434,52 @@ internal class UserDataSource internal constructor(
else fetchUsersByIds(missingIds.map { it.toModel() }.toSet()).map { }
}

// TODO: this can cause many issues since it will
@OptIn(ExperimentalCoroutinesApi::class)
override suspend fun observeSelfUser(): Flow<SelfUser> {
return metadataDAO.valueByKeyFlow(SELF_USER_ID_KEY).onEach {
// If the self user is not in the database, proactively fetch it.
if (it == null) {
val logPrefix = "Observing self user before insertion"
kaliumLogger.w("$logPrefix: Triggering a fetch.")
fetchSelfUser().fold({ failure ->
kaliumLogger.e("""$logPrefix failed: {"failure":"$failure"}""")
}, {
kaliumLogger.i("$logPrefix: Succeeded")
userDetailsRefreshInstantCache[selfUserId] = DateTimeUtil.currentInstant()
})
} else {
refreshUserDetailsIfNeeded(selfUserId)
}
}.filterNotNull().flatMapMerge { encodedValue ->
val selfUserID: QualifiedIDEntity = Json.decodeFromString(encodedValue)
userDAO.observeUserDetailsByQualifiedID(selfUserID)
.filterNotNull()

val selfUser = getOrFetchSelfUserId()

return if (selfUser != null) {
userDAO.observeUserDetailsByQualifiedID(selfUser).filterNotNull()
.map(userMapper::fromUserDetailsEntityToSelfUser)
} else {
emptyFlow()
}
}

private suspend fun getOrFetchSelfUserId(): QualifiedIDEntity? {

var userId = metadataDAO.valueByKey(SELF_USER_ID_KEY)

if (userId == null) {

val logPrefix = "Observing self user before insertion"
kaliumLogger.w("$logPrefix: Triggering a fetch.")

fetchSelfUser().fold({ failure ->
kaliumLogger.e("""$logPrefix failed: {"failure":"$failure"}""")
}, {
kaliumLogger.i("$logPrefix: Succeeded")
userDetailsRefreshInstantCache[selfUserId] = DateTimeUtil.currentInstant()
})
} else {
refreshUserDetailsIfNeeded(selfUserId)
}

return metadataDAO.valueByKey(SELF_USER_ID_KEY)?.let { Json.decodeFromString(it) }
}

@OptIn(ExperimentalCoroutinesApi::class)
override suspend fun observeSelfUserWithTeam(): Flow<Pair<SelfUser, Team?>> {
return metadataDAO.valueByKeyFlow(SELF_USER_ID_KEY).filterNotNull().flatMapMerge { encodedValue ->
val selfUserID: QualifiedIDEntity = Json.decodeFromString(encodedValue)
userDAO.getUserDetailsWithTeamByQualifiedID(selfUserID)
.filterNotNull()

val selfUser = getOrFetchSelfUserId()

return if (selfUser != null) {
userDAO.getUserDetailsWithTeamByQualifiedID(selfUser).filterNotNull()
.map { (user, team) ->
userMapper.fromUserDetailsEntityToSelfUser(user) to team?.let { teamMapper.fromDaoModelToTeam(it) }
}
} else {
emptyFlow()
}
}

Expand All @@ -470,9 +499,12 @@ internal class UserDataSource internal constructor(
}.map { }
}

// TODO: replace the flow with selfUser and cache it
override suspend fun getSelfUser(): SelfUser? {
return observeSelfUser().firstOrNull()
return getOrFetchSelfUserId()?.let {
userDAO.getUserDetailsByQualifiedID(it)?.let {
userMapper.fromUserDetailsEntityToSelfUser(it)
}
}
}

override suspend fun observeAllKnownUsers(): Flow<Either<StorageFailure, List<OtherUser>>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,17 @@ import com.wire.kalium.logic.data.user.SelfUser
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.util.KaliumDispatcher
import com.wire.kalium.util.KaliumDispatcherImpl
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.withContext

/**
* This use case is responsible for retrieving the current user.
* fixme: Rename to ObserveSelfUserUseCase
*/
interface GetSelfUserUseCase {

/**
* @return a [Flow] of the current user [SelfUser]
* @return current user [SelfUser]
*/
suspend operator fun invoke(): Flow<SelfUser>
suspend operator fun invoke(): SelfUser?

}

Expand All @@ -43,7 +41,7 @@ internal class GetSelfUserUseCaseImpl internal constructor(
private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl
) : GetSelfUserUseCase {

override suspend operator fun invoke(): Flow<SelfUser> = withContext(dispatcher.io) {
userRepository.observeSelfUser()
override suspend operator fun invoke(): SelfUser? = withContext(dispatcher.io) {
userRepository.getSelfUser()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/

package com.wire.kalium.logic.feature.user

import com.wire.kalium.logic.data.user.SelfUser
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.util.KaliumDispatcher
import com.wire.kalium.util.KaliumDispatcherImpl
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.withContext

/**
* This use case is responsible for observing the current user.
*/
interface ObserveSelfUserUseCase {

/**
* @return a [Flow] of the current user [SelfUser]
*/
suspend operator fun invoke(): Flow<SelfUser>

}

internal class ObserveSelfUserUseCaseImpl internal constructor(
private val userRepository: UserRepository,
private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl
) : ObserveSelfUserUseCase {

override suspend operator fun invoke(): Flow<SelfUser> = withContext(dispatcher.io) {
userRepository.observeSelfUser()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class UserScope internal constructor(
) {
private val validateUserHandleUseCase: ValidateUserHandleUseCase get() = ValidateUserHandleUseCaseImpl()
val getSelfUser: GetSelfUserUseCase get() = GetSelfUserUseCaseImpl(userRepository)
val observeSelfUser: ObserveSelfUserUseCase get() = ObserveSelfUserUseCaseImpl(userRepository)
val getSelfUserWithTeam: ObserveSelfUserWithTeamUseCase get() = ObserveSelfUserWithTeamUseCaseImpl(userRepository)
val observeUserInfo: ObserveUserInfoUseCase get() = ObserveUserInfoUseCaseImpl(userRepository, teamRepository)
val uploadUserAvatar: UploadUserAvatarUseCase get() = UploadUserAvatarUseCaseImpl(userRepository, assetRepository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ class UserRepositoryTest {
selfUserIdChannel.send(TestUser.JSON_QUALIFIED_ID)
// given
val (arrangement, userRepository) = Arrangement()
.withSelfUserIdMetadataReturning(TestUser.JSON_QUALIFIED_ID)
.withSelfUserIdFlowMetadataReturning(selfUserIdChannel.consumeAsFlow())
.withRemoteGetSelfReturningUser()
.arrange()
Expand Down Expand Up @@ -386,6 +387,7 @@ class UserRepositoryTest {
@Test
fun givenAKnownSelfUser_whenGettingFromDbAndCacheExpiredOrNotPresent_thenShouldRefreshItsDataFromAPI() = runTest {
val (arrangement, userRepository) = Arrangement()
.withSelfUserIdMetadataReturning(TestUser.JSON_QUALIFIED_ID)
.withSelfUserIdFlowMetadataReturning(flowOf(TestUser.JSON_QUALIFIED_ID))
.withRemoteGetSelfReturningUser()
.withGetTeamMemberSuccess(TestTeam.memberDTO(TestUser.SELF.id.value))
Expand All @@ -409,6 +411,7 @@ class UserRepositoryTest {
@Test
fun givenAKnownSelfUser_whenGettingFromDbAndCacheValid_thenShouldNOTRefreshItsDataFromAPI() = runTest {
val (arrangement, userRepository) = Arrangement()
.withSelfUserIdMetadataReturning(TestUser.JSON_QUALIFIED_ID)
.withSelfUserIdFlowMetadataReturning(flowOf(TestUser.JSON_QUALIFIED_ID))
.withRemoteGetSelfReturningUser()
.withGetTeamMemberSuccess(TestTeam.memberDTO(TestUser.SELF.id.value))
Expand Down Expand Up @@ -890,6 +893,12 @@ class UserRepositoryTest {
}.returns(value)
}

suspend fun withSelfUserIdMetadataReturning(selfUserId: String?) = apply {
coEvery {
metadataDAO.valueByKey(eq(SELF_USER_ID_KEY))
}.returns(selfUserId)
}

suspend fun withSelfUserIdFlowMetadataReturning(selfUserIdStringFlow: Flow<String?>) = apply {
coEvery {
metadataDAO.valueByKeyFlow(eq(SELF_USER_ID_KEY))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package com.wire.kalium.persistence.dao

import com.wire.kalium.logger.obfuscateDomain
import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.persistence.dao.ManagedByEntity.WIRE
import com.wire.kalium.persistence.dao.conversation.NameAndHandleEntity
import kotlinx.coroutines.flow.Flow
import kotlinx.datetime.Instant
Expand Down Expand Up @@ -258,6 +257,7 @@ interface UserDAO {
suspend fun observeUserDetailsByQualifiedID(qualifiedID: QualifiedIDEntity): Flow<UserDetailsEntity?>
suspend fun getUserDetailsWithTeamByQualifiedID(qualifiedID: QualifiedIDEntity): Flow<Pair<UserDetailsEntity, TeamEntity?>?>
suspend fun getUserMinimizedByQualifiedID(qualifiedID: QualifiedIDEntity): UserEntityMinimized?
suspend fun getUserDetailsByQualifiedID(qualifiedID: QualifiedIDEntity): UserDetailsEntity?
suspend fun getUsersDetailsByQualifiedIDList(qualifiedIDList: List<QualifiedIDEntity>): List<UserDetailsEntity>
suspend fun getUserDetailsByNameOrHandleOrEmailAndConnectionStates(
searchQuery: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,13 @@ class UserDAOImpl internal constructor(
}.executeAsList()
}

override suspend fun getUserDetailsByQualifiedID(qualifiedID: QualifiedIDEntity): UserDetailsEntity? =
withContext(queriesContext) {
userQueries.selectDetailsByQualifiedId(listOf(qualifiedID))
.executeAsOneOrNull()
?.let { mapper.toDetailsModel(it) }
}

override suspend fun getUsersDetailsByQualifiedIDList(qualifiedIDList: List<QualifiedIDEntity>): List<UserDetailsEntity> =
withContext(queriesContext) {
userQueries.selectDetailsByQualifiedId(qualifiedIDList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class PocIntegrationTest {
launch {
val userSession = initUserSession(createCoreLogic(mockedRequests))

val selfUserId = userSession.users.getSelfUser().first().id
val selfUserId = userSession.users.observeSelfUser().first().id
val selfClientId = userSession.clientIdProvider().getOrFail { throw IllegalStateException("No self client is registered") }
val targetUserId = UserId(value = selfUserId.value, domain = selfUserId.domain)

Expand Down
Loading