Return cached room members before fetching new ones, do it in batches (#2274)

* Use cached users from the Rust SDK.

Also lazy load received users by batches.

* Create `RoomMemberListFetcher` to wrap all the room member loading logic

* Ensure we clear `RoomMember` Rust references if the fetching coroutine is canceled
This commit is contained in:
Jorge Martin Espinosa 2024-01-23 18:23:20 +01:00 committed by GitHub
parent 196d8a2db6
commit da4825aa44
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 579 additions and 84 deletions

View file

@ -18,7 +18,7 @@ package io.element.android.libraries.matrix.impl.notification
import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.notification.NotificationContent
import io.element.android.libraries.matrix.impl.room.RoomMemberMapper
import io.element.android.libraries.matrix.impl.room.member.RoomMemberMapper
import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessageMapper
import org.matrix.rustcomponents.sdk.MessageLikeEventContent
import org.matrix.rustcomponents.sdk.StateEventContent

View file

@ -19,6 +19,7 @@ package io.element.android.libraries.matrix.impl.room
import io.element.android.libraries.matrix.api.room.CurrentUserMembership
import io.element.android.libraries.matrix.api.room.MatrixRoomInfo
import io.element.android.libraries.matrix.api.room.RoomNotificationMode
import io.element.android.libraries.matrix.impl.room.member.RoomMemberMapper
import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper
import kotlinx.collections.immutable.toImmutableList
import org.matrix.rustcomponents.sdk.use

View file

@ -18,7 +18,6 @@ package io.element.android.libraries.matrix.impl.room
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.core.coroutine.childScope
import io.element.android.libraries.core.coroutine.parallelMap
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.ProgressCallback
import io.element.android.libraries.matrix.api.core.RoomId
@ -39,7 +38,6 @@ import io.element.android.libraries.matrix.api.room.Mention
import io.element.android.libraries.matrix.api.room.MessageEventType
import io.element.android.libraries.matrix.api.room.StateEventType
import io.element.android.libraries.matrix.api.room.location.AssetType
import io.element.android.libraries.matrix.api.room.roomMembers
import io.element.android.libraries.matrix.api.room.roomNotificationSettings
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
import io.element.android.libraries.matrix.api.widget.MatrixWidgetDriver
@ -51,20 +49,17 @@ import io.element.android.libraries.matrix.impl.media.toMSC3246range
import io.element.android.libraries.matrix.impl.notificationsettings.RustNotificationSettingsService
import io.element.android.libraries.matrix.impl.poll.toInner
import io.element.android.libraries.matrix.impl.room.location.toInner
import io.element.android.libraries.matrix.impl.room.member.RoomMemberListFetcher
import io.element.android.libraries.matrix.impl.timeline.AsyncMatrixTimeline
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
import io.element.android.libraries.matrix.impl.util.destroyAll
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
import io.element.android.libraries.matrix.impl.widget.RustWidgetDriver
import io.element.android.libraries.matrix.impl.widget.generateWidgetWebViewUrl
import io.element.android.libraries.sessionstorage.api.SessionData
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.collections.immutable.toImmutableList
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
@ -75,7 +70,6 @@ import org.matrix.rustcomponents.sdk.EventTimelineItem
import org.matrix.rustcomponents.sdk.RoomInfo
import org.matrix.rustcomponents.sdk.RoomInfoListener
import org.matrix.rustcomponents.sdk.RoomListItem
import org.matrix.rustcomponents.sdk.RoomMember
import org.matrix.rustcomponents.sdk.RoomMessageEventContentWithoutRelation
import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle
import org.matrix.rustcomponents.sdk.WidgetCapabilities
@ -125,8 +119,8 @@ class RustMatrixRoom(
private val roomMembersDispatcher = coroutineDispatchers.io.limitedParallelism(8)
private val roomCoroutineScope = sessionCoroutineScope.childScope(coroutineDispatchers.main, "RoomScope-$roomId")
private val _membersStateFlow = MutableStateFlow<MatrixRoomMembersState>(MatrixRoomMembersState.Unknown)
private val _syncUpdateFlow = MutableStateFlow(0L)
private val roomMemberListFetcher = RoomMemberListFetcher(innerRoom, roomMembersDispatcher)
private val _roomNotificationSettingsStateFlow = MutableStateFlow<MatrixRoomNotificationSettingsState>(MatrixRoomNotificationSettingsState.Unknown)
override val roomNotificationSettingsStateFlow: StateFlow<MatrixRoomNotificationSettingsState> = _roomNotificationSettingsStateFlow
@ -135,7 +129,7 @@ class RustMatrixRoom(
_syncUpdateFlow.value = systemClock.epochMillis()
}
override val membersStateFlow: StateFlow<MatrixRoomMembersState> = _membersStateFlow.asStateFlow()
override val membersStateFlow: StateFlow<MatrixRoomMembersState> = roomMemberListFetcher.membersFlow
override val syncUpdateFlow: StateFlow<Long> = _syncUpdateFlow.asStateFlow()
@ -192,35 +186,7 @@ class RustMatrixRoom(
override val activeMemberCount: Long
get() = innerRoom.activeMembersCount().toLong()
override suspend fun updateMembers(): Result<Unit> = withContext(roomMembersDispatcher) {
val currentState = _membersStateFlow.value
val currentMembers = currentState.roomMembers()?.toImmutableList()
_membersStateFlow.value = MatrixRoomMembersState.Pending(prevRoomMembers = currentMembers)
var rustMembers: List<RoomMember>? = null
try {
rustMembers = innerRoom.members().use { membersIterator ->
buildList {
while (true) {
// Loading the whole membersIterator as a stop-gap measure.
// We should probably implement some sort of paging in the future.
ensureActive()
addAll(membersIterator.nextChunk(1000u) ?: break)
}
}
}
val mappedMembers = rustMembers.parallelMap(RoomMemberMapper::map)
_membersStateFlow.value = MatrixRoomMembersState.Ready(mappedMembers.toImmutableList())
Result.success(Unit)
} catch (exception: CancellationException) {
_membersStateFlow.value = MatrixRoomMembersState.Error(prevRoomMembers = currentMembers, failure = exception)
throw exception
} catch (exception: Exception) {
_membersStateFlow.value = MatrixRoomMembersState.Error(prevRoomMembers = currentMembers, failure = exception)
Result.failure(exception)
} finally {
rustMembers?.destroyAll()
}
}
override suspend fun updateMembers() = roomMemberListFetcher.fetchRoomMembers()
override suspend fun userDisplayName(userId: UserId): Result<String?> = withContext(roomDispatcher) {
runCatching {

View file

@ -0,0 +1,129 @@
/*
* Copyright (c) 2024 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.room.member
import io.element.android.libraries.core.coroutine.parallelMap
import io.element.android.libraries.matrix.api.room.MatrixRoomMembersState
import io.element.android.libraries.matrix.api.room.roomMembers
import io.element.android.libraries.matrix.impl.util.destroyAll
import kotlinx.collections.immutable.toImmutableList
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.RoomInterface
import org.matrix.rustcomponents.sdk.RoomMembersIterator
import org.matrix.rustcomponents.sdk.use
import timber.log.Timber
import kotlin.coroutines.cancellation.CancellationException
import kotlin.coroutines.coroutineContext
/**
* This class fetches the room members for a given room in a 'paginated' way, and taking into account previous cached values.
*/
internal class RoomMemberListFetcher(
private val room: RoomInterface,
private val dispatcher: CoroutineDispatcher,
private val pageSize: Int = 1000,
) {
private val updatedRoomMemberMutex = Mutex()
private val roomId = room.id()
private val _membersFlow = MutableStateFlow<MatrixRoomMembersState>(MatrixRoomMembersState.Unknown)
val membersFlow: StateFlow<MatrixRoomMembersState> = _membersFlow
/**
* Fetches the room members for the given room.
* It will emit the cached members first, and then the updated members in batches of [pageSize] items, through [membersFlow].
* @param withCache Whether to load the cached members first. Defaults to true.
*/
suspend fun fetchRoomMembers(withCache: Boolean = true) {
if (updatedRoomMemberMutex.isLocked) {
Timber.i("Room members are already being updated for room $roomId")
return
}
updatedRoomMemberMutex.withLock {
withContext(dispatcher) {
// Load cached members as fallback and to get faster results
if (withCache) {
if (_membersFlow.value !is MatrixRoomMembersState.Ready) {
fetchCachedRoomMembers()
} else {
Timber.i("No need to load cached members found for room $roomId")
}
}
val prevRoomMembers = (_membersFlow.value as? MatrixRoomMembersState.Ready)?.roomMembers?.toImmutableList()
_membersFlow.value = MatrixRoomMembersState.Pending(prevRoomMembers = prevRoomMembers)
try {
// Start loading new members
parseAndEmitMembers(room.members())
} catch (exception: CancellationException) {
Timber.d("Cancelled loading updated members for room $roomId")
throw exception
} catch (exception: Exception) {
Timber.e(exception, "Failed to load updated members for room $roomId")
_membersFlow.value = MatrixRoomMembersState.Error(exception, prevRoomMembers)
}
}
}
}
internal suspend fun fetchCachedRoomMembers() = withContext(dispatcher) {
Timber.i("Loading cached members for room $roomId")
try {
val iterator = room.membersNoSync()
parseAndEmitMembers(iterator)
} catch (exception: CancellationException) {
Timber.d("Cancelled loading cached members for room $roomId")
throw exception
} catch (exception: Exception) {
Timber.e(exception, "Failed to load cached members for room $roomId")
_membersFlow.value = MatrixRoomMembersState.Error(exception, _membersFlow.value.roomMembers()?.toImmutableList())
}
}
private suspend fun parseAndEmitMembers(roomMembersIterator: RoomMembersIterator) {
roomMembersIterator.use { iterator ->
val results = buildList {
while (true) {
// Loading the whole membersIterator as a stop-gap measure.
// We should probably implement some sort of paging in the future.
coroutineContext.ensureActive()
val chunk = iterator.nextChunk(pageSize.toUInt())
val members = try {
// Load next chunk. If null (no more items), exit the loop
chunk?.parallelMap(RoomMemberMapper::map) ?: break
} finally {
// Make sure we clear all member references
chunk?.destroyAll()
}
addAll(members)
Timber.i("Emitting first $size members for room $roomId")
_membersFlow.value = MatrixRoomMembersState.Ready(toImmutableList())
}
}
if (results.isEmpty()) {
_membersFlow.value = MatrixRoomMembersState.Ready(results.toImmutableList())
}
}
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 New Vector Ltd
* Copyright (c) 2024 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.room
package io.element.android.libraries.matrix.impl.room.member
import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.room.RoomMember

View file

@ -19,7 +19,7 @@ package io.element.android.libraries.matrix.impl.roomlist
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.roomlist.RoomSummaryDetails
import io.element.android.libraries.matrix.impl.notificationsettings.RoomNotificationSettingsMapper
import io.element.android.libraries.matrix.impl.room.RoomMemberMapper
import io.element.android.libraries.matrix.impl.room.member.RoomMemberMapper
import io.element.android.libraries.matrix.impl.room.message.RoomMessageFactory
import org.matrix.rustcomponents.sdk.RoomInfo
import org.matrix.rustcomponents.sdk.use

View file

@ -0,0 +1,303 @@
/*
* Copyright (c) 2024 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.room.member
import app.cash.turbine.test
import com.google.common.truth.Truth.assertThat
import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.room.MatrixRoomMembersState
import io.element.android.libraries.matrix.api.room.roomMembers
import io.element.android.libraries.matrix.test.A_ROOM_ID
import io.element.android.libraries.matrix.test.A_USER_ID
import io.element.android.libraries.matrix.test.A_USER_ID_2
import io.element.android.libraries.matrix.test.A_USER_ID_3
import io.element.android.libraries.matrix.test.A_USER_ID_4
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.test.runTest
import org.junit.Test
import org.matrix.rustcomponents.sdk.MembershipState
import org.matrix.rustcomponents.sdk.NoPointer
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.RoomMember
import org.matrix.rustcomponents.sdk.RoomMembersIterator
class RoomMemberListFetcherTest {
@Test
fun `fetchCachedRoomMembers - emits cached members, if any`() = runTest {
val room = FakeRustRoom(getMembersNoSync = {
FakeRoomMembersIterator(
listOf(
FakeRustRoomMember(A_USER_ID),
FakeRustRoomMember(A_USER_ID_2),
FakeRustRoomMember(A_USER_ID_3),
)
)
})
val fetcher = RoomMemberListFetcher(room, Dispatchers.Default)
fetcher.membersFlow.test {
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Unknown::class.java)
fetcher.fetchCachedRoomMembers()
val readyItem = awaitItem()
assertThat(readyItem).isInstanceOf(MatrixRoomMembersState.Ready::class.java)
assertThat((readyItem as? MatrixRoomMembersState.Ready)?.roomMembers?.size).isEqualTo(3)
}
}
@Test
fun `fetchCachedRoomMembers - emits empty list, if no members exist`() = runTest {
val room = FakeRustRoom(getMembersNoSync = {
FakeRoomMembersIterator(emptyList())
})
val fetcher = RoomMemberListFetcher(room, Dispatchers.Default)
fetcher.membersFlow.test {
fetcher.fetchCachedRoomMembers()
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Unknown::class.java)
assertThat(awaitItem().roomMembers()).isEmpty()
}
}
@Test
fun `fetchCachedRoomMembers - emits Error on error found`() = runTest {
val room = FakeRustRoom(getMembersNoSync = {
error("Some unexpected issue")
})
val fetcher = RoomMemberListFetcher(room, Dispatchers.Default)
fetcher.membersFlow.test {
fetcher.fetchCachedRoomMembers()
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Unknown::class.java)
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Error::class.java)
}
}
@Test
fun `fetchCachedRoomMembers - emits items using page size`() = runTest {
val room = FakeRustRoom(getMembersNoSync = {
FakeRoomMembersIterator(
listOf(
FakeRustRoomMember(A_USER_ID),
FakeRustRoomMember(A_USER_ID_2),
FakeRustRoomMember(A_USER_ID_3),
)
)
})
val fetcher = RoomMemberListFetcher(room, Dispatchers.Default, pageSize = 2)
fetcher.membersFlow.test {
fetcher.fetchCachedRoomMembers()
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Unknown::class.java)
assertThat((awaitItem() as? MatrixRoomMembersState.Ready)?.roomMembers?.size).isEqualTo(2)
assertThat((awaitItem() as? MatrixRoomMembersState.Ready)?.roomMembers?.size).isEqualTo(3)
ensureAllEventsConsumed()
}
}
@Test
fun `fetchRoomMembers - with 'withCache' set to false emits only new members, if any`() = runTest {
val room = FakeRustRoom(getMembers = {
FakeRoomMembersIterator(
listOf(
FakeRustRoomMember(A_USER_ID),
FakeRustRoomMember(A_USER_ID_2),
FakeRustRoomMember(A_USER_ID_3),
)
)
})
val fetcher = RoomMemberListFetcher(room, Dispatchers.Default)
fetcher.membersFlow.test {
fetcher.fetchRoomMembers(withCache = false)
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Unknown::class.java)
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Pending::class.java)
assertThat((awaitItem() as? MatrixRoomMembersState.Ready)?.roomMembers?.size).isEqualTo(3)
}
}
@Test
fun `fetchRoomMembers - on error it emits an Error item`() = runTest {
val room = FakeRustRoom(getMembers = { error("An unexpected error") })
val fetcher = RoomMemberListFetcher(room, Dispatchers.Default)
fetcher.membersFlow.test {
fetcher.fetchRoomMembers(withCache = false)
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Unknown::class.java)
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Pending::class.java)
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Error::class.java)
}
}
@Test
fun `fetchRoomMembers - with 'withCache' returns cached items first, then new ones`() = runTest {
val room = FakeRustRoom(
getMembersNoSync = {
FakeRoomMembersIterator(listOf(FakeRustRoomMember(A_USER_ID_4)))
},
getMembers = {
FakeRoomMembersIterator(
listOf(
FakeRustRoomMember(A_USER_ID),
FakeRustRoomMember(A_USER_ID_2),
FakeRustRoomMember(A_USER_ID_3),
)
)
}
)
val fetcher = RoomMemberListFetcher(room, Dispatchers.Default)
fetcher.membersFlow.test {
fetcher.fetchRoomMembers(withCache = true)
// Initial
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Unknown::class.java)
// Loaded cached
awaitItem().let { cached ->
assertThat(cached).isInstanceOf(MatrixRoomMembersState.Ready::class.java)
assertThat(cached.roomMembers()).hasSize(1)
}
// Start loading new
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Pending::class.java)
awaitItem().let { ready ->
assertThat(ready).isInstanceOf(MatrixRoomMembersState.Ready::class.java)
assertThat(ready.roomMembers()).hasSize(3)
}
}
}
@Test
fun `fetchRoomMembers - with 'withCache' skips cache if there is already a ready state`() = runTest {
val room = FakeRustRoom(
getMembersNoSync = {
FakeRoomMembersIterator(listOf(FakeRustRoomMember(A_USER_ID_4)))
},
getMembers = {
FakeRoomMembersIterator(
listOf(
FakeRustRoomMember(A_USER_ID),
FakeRustRoomMember(A_USER_ID_2),
FakeRustRoomMember(A_USER_ID_3),
)
)
}
)
val fetcher = RoomMemberListFetcher(room, Dispatchers.Default)
// Set a ready state
fetcher.fetchRoomMembers(withCache = false)
fetcher.membersFlow.test {
// Start loading new members
fetcher.fetchRoomMembers(withCache = true)
// Previous ready state
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Ready::class.java)
// New pending state
assertThat(awaitItem()).isInstanceOf(MatrixRoomMembersState.Pending::class.java)
// New ready state
awaitItem().let { ready ->
assertThat(ready).isInstanceOf(MatrixRoomMembersState.Ready::class.java)
assertThat(ready.roomMembers()).hasSize(3)
}
}
}
}
class FakeRustRoom(
private val getMembers: () -> RoomMembersIterator = { FakeRoomMembersIterator() },
private val getMembersNoSync: () -> RoomMembersIterator = { FakeRoomMembersIterator() },
) : Room(NoPointer) {
override fun id(): String {
return A_ROOM_ID.value
}
override suspend fun members(): RoomMembersIterator {
return getMembers()
}
override suspend fun membersNoSync(): RoomMembersIterator {
return getMembersNoSync()
}
override fun close() {
// No-op
}
}
class FakeRoomMembersIterator(
private var members: List<RoomMember>? = null
) : RoomMembersIterator(NoPointer) {
override fun len(): UInt {
return members?.size?.toUInt() ?: 0u
}
override fun nextChunk(chunkSize: UInt): List<RoomMember>? {
if (members?.isEmpty() == true) {
return null
}
return members?.let {
val result = it.take(chunkSize.toInt())
members = it.subList(result.size, it.size)
result
}
}
}
class FakeRustRoomMember(
private val userId: UserId,
private val displayName: String? = null,
private val avatarUrl: String? = null,
private val membership: MembershipState = MembershipState.JOIN,
private val isNameAmbiguous: Boolean = false,
private val powerLevel: Long = 0L,
) : RoomMember(NoPointer) {
override fun userId(): String {
return userId.value
}
override fun displayName(): String? {
return displayName
}
override fun avatarUrl(): String? {
return avatarUrl
}
override fun membership(): MembershipState {
return membership
}
override fun isNameAmbiguous(): Boolean {
return isNameAmbiguous
}
override fun powerLevel(): Long {
return powerLevel
}
override fun normalizedPowerLevel(): Long {
return powerLevel
}
override fun isIgnored(): Boolean {
return false
}
}