From c508c8bdd07c152bef8857873a61156f9c0567b7 Mon Sep 17 00:00:00 2001 From: ganfra Date: Fri, 12 Jul 2024 18:11:36 +0200 Subject: [PATCH 1/5] Performance : subscribe to timeline items only when necessary --- .../matrix/impl/timeline/RustTimeline.kt | 127 +++++++----------- .../impl/timeline/TimelineItemsSubscriber.kt | 108 +++++++++++++++ 2 files changed, 156 insertions(+), 79 deletions(-) create mode 100644 libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt index 9cdb8d1366..9f48e59995 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt @@ -56,8 +56,6 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow @@ -67,32 +65,28 @@ import kotlinx.coroutines.flow.getAndUpdate import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.FormattedBody import org.matrix.rustcomponents.sdk.MessageFormat import org.matrix.rustcomponents.sdk.RoomMessageEventContentWithoutRelation import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle -import org.matrix.rustcomponents.sdk.TimelineChange -import org.matrix.rustcomponents.sdk.TimelineDiff -import org.matrix.rustcomponents.sdk.TimelineItem import org.matrix.rustcomponents.sdk.messageEventContentFromHtml import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown import org.matrix.rustcomponents.sdk.use import timber.log.Timber -import uniffi.matrix_sdk_ui.EventItemOrigin import uniffi.matrix_sdk_ui.LiveBackPaginationStatus import java.io.File import java.util.Date import java.util.concurrent.atomic.AtomicBoolean import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline -private const val INITIAL_MAX_SIZE = 50 private const val PAGINATION_SIZE = 50 class RustTimeline( private val inner: InnerTimeline, - isLive: Boolean, + private val isLive: Boolean, systemClock: SystemClock, roomCoroutineScope: CoroutineScope, isKeyBackupEnabled: Boolean, @@ -100,7 +94,7 @@ class RustTimeline( private val dispatcher: CoroutineDispatcher, lastLoginTimestamp: Date?, private val roomContentForwarder: RoomContentForwarder, - private val onNewSyncedEvent: () -> Unit, + onNewSyncedEvent: () -> Unit, ) : Timeline { private val initLatch = CompletableDeferred() private val isInit = AtomicBoolean(false) @@ -108,20 +102,9 @@ class RustTimeline( private val _timelineItems: MutableStateFlow> = MutableStateFlow(emptyList()) - private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor( - lastLoginTimestamp = lastLoginTimestamp, - isRoomEncrypted = matrixRoom.isEncrypted, - isKeyBackupEnabled = isKeyBackupEnabled, - dispatcher = dispatcher, - ) - - private val roomBeginningPostProcessor = RoomBeginningPostProcessor() - private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock) - private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive) - private val timelineEventContentMapper = TimelineEventContentMapper() private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper) - private val timelineItemFactory = MatrixTimelineItemMapper( + private val timelineItemMapper = MatrixTimelineItemMapper( fetchDetailsForEvent = this::fetchDetailsForEvent, roomCoroutineScope = roomCoroutineScope, virtualTimelineItemMapper = VirtualTimelineItemMapper(), @@ -129,11 +112,29 @@ class RustTimeline( contentMapper = timelineEventContentMapper ) ) - private val timelineDiffProcessor = MatrixTimelineDiffProcessor( timelineItems = _timelineItems, - timelineItemFactory = timelineItemFactory, + timelineItemFactory = timelineItemMapper, ) + private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor( + lastLoginTimestamp = lastLoginTimestamp, + isRoomEncrypted = matrixRoom.isEncrypted, + isKeyBackupEnabled = isKeyBackupEnabled, + dispatcher = dispatcher, + ) + private val timelineItemsSubscriber = TimelineItemsSubscriber( + timeline = inner, + roomCoroutineScope = roomCoroutineScope, + timelineDiffProcessor = timelineDiffProcessor, + initLatch = initLatch, + isInit = isInit, + dispatcher = dispatcher, + onNewSyncedEvent = onNewSyncedEvent, + ) + + private val roomBeginningPostProcessor = RoomBeginningPostProcessor() + private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock) + private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive) private val backPaginationStatus = MutableStateFlow( Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true) @@ -145,36 +146,28 @@ class RustTimeline( init { roomCoroutineScope.launch(dispatcher) { - inner.timelineDiffFlow() - .onEach { diffs -> - if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) { - onNewSyncedEvent() - } - postDiffs(diffs) - } - .launchIn(this) - - launch { - fetchMembers() - } - + fetchMembers() if (isLive) { // When timeline is live, we need to listen to the back pagination status as // sdk can automatically paginate backwards. - inner.liveBackPaginationStatus() - .onEach { backPaginationStatus -> - updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) { - when (backPaginationStatus) { - is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline) - is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true) - } - } - } - .launchIn(this) + registerBackPaginationStatusListener() } } } + private fun CoroutineScope.registerBackPaginationStatusListener() { + inner.liveBackPaginationStatus() + .onEach { backPaginationStatus -> + updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) { + when (backPaginationStatus) { + is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline) + is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true) + } + } + } + .launchIn(this) + } + override val membershipChangeEventReceived: Flow = timelineDiffProcessor.membershipChangeEventReceived override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result { @@ -248,13 +241,15 @@ class RustTimeline( // Keep lastForwardIndicatorsPostProcessor last .let { items -> lastForwardIndicatorsPostProcessor.process(items) } } + }.onStart { + timelineItemsSubscriber.subscribeIfNeeded() } override fun close() { inner.close() } - private suspend fun fetchMembers() = withContext(dispatcher) { + private fun CoroutineScope.fetchMembers() = launch(dispatcher) { initLatch.await() try { inner.fetchMembers() @@ -263,32 +258,6 @@ class RustTimeline( } } - private suspend fun postItems(items: List) = coroutineScope { - // Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap. - items.chunked(INITIAL_MAX_SIZE).reversed().forEach { - ensureActive() - timelineDiffProcessor.postItems(it) - } - isInit.set(true) - initLatch.complete(Unit) - } - - private suspend fun postDiffs(diffs: List) { - val diffsToProcess = diffs.toMutableList() - if (!isInit.get()) { - val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET } - if (resetDiff != null) { - // Keep using the postItems logic so we can post the timelineItems asap. - postItems(resetDiff.reset() ?: emptyList()) - diffsToProcess.remove(resetDiff) - } - } - initLatch.await() - if (diffsToProcess.isNotEmpty()) { - timelineDiffProcessor.postDiffs(diffsToProcess) - } - } - override suspend fun sendMessage(body: String, htmlBody: String?, mentions: List): Result = withContext(dispatcher) { messageEventContentFromParts(body, htmlBody).withMentions(mentions.map()).use { content -> runCatching { @@ -550,12 +519,6 @@ class RustTimeline( } } - private suspend fun fetchDetailsForEvent(eventId: EventId): Result { - return runCatching { - inner.fetchDetailsForEvent(eventId.value) - } - } - override suspend fun loadReplyDetails(eventId: EventId): InReplyTo = withContext(dispatcher) { val timelineItem = _timelineItems.value.firstOrNull { timelineItem -> timelineItem is MatrixTimelineItem.Event && timelineItem.eventId == eventId @@ -572,4 +535,10 @@ class RustTimeline( inner.loadReplyDetails(eventId.value).use(inReplyToMapper::map) } } + + private suspend fun fetchDetailsForEvent(eventId: EventId): Result { + return runCatching { + inner.fetchDetailsForEvent(eventId.value) + } + } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt new file mode 100644 index 0000000000..0205ac20e5 --- /dev/null +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2024 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.element.android.libraries.matrix.impl.timeline + +import io.element.android.libraries.core.coroutine.childScope +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.matrix.rustcomponents.sdk.Timeline +import org.matrix.rustcomponents.sdk.TimelineChange +import org.matrix.rustcomponents.sdk.TimelineDiff +import org.matrix.rustcomponents.sdk.TimelineItem +import uniffi.matrix_sdk_ui.EventItemOrigin +import java.util.concurrent.atomic.AtomicBoolean + +private const val INITIAL_MAX_SIZE = 50 + +/** + * This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor. + * It will also trigger a callback when a new synced event is received. + * It will also handle the initial items and make sure they are posted before any diff. + * When closing the room subscription, it will also unsubscribe automatically. + */ +internal class TimelineItemsSubscriber( + roomCoroutineScope: CoroutineScope, + dispatcher: CoroutineDispatcher, + private val timeline: Timeline, + private val timelineDiffProcessor: MatrixTimelineDiffProcessor, + private val initLatch: CompletableDeferred, + private val isInit: AtomicBoolean, + private val onNewSyncedEvent: () -> Unit, +) { + private var subscriptionCount = 0 + private val mutex = Mutex() + + private val coroutineScope = roomCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber") + + suspend fun subscribeIfNeeded() = mutex.withLock { + if (subscriptionCount == 0) { + timeline.timelineDiffFlow() + .onEach { diffs -> + if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) { + onNewSyncedEvent() + } + postDiffs(diffs) + } + .launchIn(coroutineScope) + } + subscriptionCount++ + } + + suspend fun unsubscribeIfNeeded() = mutex.withLock { + when (subscriptionCount) { + 0 -> return@withLock + 1 -> { + coroutineScope.coroutineContext.cancelChildren() + } + } + subscriptionCount-- + } + + private suspend fun postItems(items: List) = coroutineScope { + // Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap. + items.chunked(INITIAL_MAX_SIZE).reversed().forEach { + ensureActive() + timelineDiffProcessor.postItems(it) + } + isInit.set(true) + initLatch.complete(Unit) + } + + private suspend fun postDiffs(diffs: List) { + val diffsToProcess = diffs.toMutableList() + if (!isInit.get()) { + val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET } + if (resetDiff != null) { + // Keep using the postItems logic so we can post the timelineItems asap. + postItems(resetDiff.reset() ?: emptyList()) + diffsToProcess.remove(resetDiff) + } + } + initLatch.await() + if (diffsToProcess.isNotEmpty()) { + timelineDiffProcessor.postDiffs(diffsToProcess) + } + } +} From ef12408b6ea4758a3d87cc8a87c410a4c708650f Mon Sep 17 00:00:00 2001 From: ganfra Date: Wed, 17 Jul 2024 14:45:14 +0200 Subject: [PATCH 2/5] Performance : add cache on roomListItem and fullRoom --- .../matrix/impl/room/RustMatrixRoom.kt | 5 +- .../matrix/impl/room/RustRoomFactory.kt | 95 +++++++++++++------ .../impl/timeline/MatrixTimelineItemMapper.kt | 4 +- .../impl/timeline/RoomTimelineExtensions.kt | 8 +- .../matrix/impl/timeline/RustTimeline.kt | 23 +++-- .../impl/timeline/TimelineItemsSubscriber.kt | 14 ++- 6 files changed, 98 insertions(+), 51 deletions(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt index 463defb284..d23e5efb2a 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt @@ -196,8 +196,6 @@ class RustMatrixRoom( override fun destroy() { roomCoroutineScope.cancel() liveTimeline.close() - innerRoom.destroy() - roomListItem.destroy() } override val displayName: String @@ -627,12 +625,13 @@ class RustMatrixRoom( isLive: Boolean, onNewSyncedEvent: () -> Unit = {}, ): Timeline { + val timelineCoroutineScope = roomCoroutineScope.childScope(coroutineDispatchers.main, "TimelineScope-$roomId-$timeline") return RustTimeline( isKeyBackupEnabled = isKeyBackupEnabled, isLive = isLive, matrixRoom = this, systemClock = systemClock, - roomCoroutineScope = roomCoroutineScope, + coroutineScope = timelineCoroutineScope, dispatcher = roomDispatcher, lastLoginTimestamp = sessionData.loginTimestamp, onNewSyncedEvent = onNewSyncedEvent, diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt index 8d3d056aab..a69cbc67d3 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt @@ -16,6 +16,7 @@ package io.element.android.libraries.matrix.impl.room +import androidx.collection.lruCache import io.element.android.appconfig.TimelineConfig import io.element.android.libraries.core.coroutine.CoroutineDispatchers import io.element.android.libraries.matrix.api.core.RoomId @@ -41,6 +42,8 @@ import org.matrix.rustcomponents.sdk.TimelineEventTypeFilter import timber.log.Timber import org.matrix.rustcomponents.sdk.RoomListService as InnerRoomListService +private const val CACHE_SIZE = 16 + class RustRoomFactory( private val sessionId: SessionId, private val notificationSettingsService: NotificationSettingsService, @@ -55,8 +58,23 @@ class RustRoomFactory( private val getSessionData: suspend () -> SessionData, ) { @OptIn(ExperimentalCoroutinesApi::class) - private val createRoomDispatcher = dispatchers.io.limitedParallelism(1) + private val dispatcher = dispatchers.io.limitedParallelism(1) private val mutex = Mutex() + private var isDestroyed: Boolean = false + + private data class RustRoomObjects( + val roomListItem: RoomListItem, + val fullRoom: Room, + ) + + private val cache = lruCache( + maxSize = CACHE_SIZE, + onEntryRemoved = { evicted, roomId, oldRoom, _ -> + Timber.d("On room removed from cache: $roomId, evicted: $evicted") + oldRoom.roomListItem.close() + oldRoom.fullRoom.close() + } + ) private val matrixRoomInfoMapper = MatrixRoomInfoMapper() @@ -70,30 +88,41 @@ class RustRoomFactory( ) } - suspend fun create(roomId: RoomId): MatrixRoom? = withContext(createRoomDispatcher) { - var cachedPairOfRoom: Pair? - mutex.withLock { - // Check if already in memory... - cachedPairOfRoom = pairOfRoom(roomId) - if (cachedPairOfRoom == null) { - // ... otherwise, lets wait for the SS to load all rooms and check again. - roomListService.allRooms.awaitLoaded() - cachedPairOfRoom = pairOfRoom(roomId) + suspend fun destroy() { + withContext(dispatcher) { + mutex.withLock { + Timber.d("Destroying room factory") + cache.evictAll() + isDestroyed = true } } - if (cachedPairOfRoom == null) { - Timber.d("No room found for $roomId") - return@withContext null - } - cachedPairOfRoom?.let { (roomListItem, fullRoom) -> + } + + suspend fun create(roomId: RoomId): MatrixRoom? = withContext(dispatcher) { + mutex.withLock { + if (isDestroyed) { + Timber.d("Room factory is destroyed, returning null for $roomId") + return@withContext null + } + var roomObjects: RustRoomObjects? = getRoomObjects(roomId) + if (roomObjects == null) { + // ... otherwise, lets wait for the SS to load all rooms and check again. + roomListService.allRooms.awaitLoaded() + roomObjects = getRoomObjects(roomId) + } + if (roomObjects == null) { + Timber.d("No room found for $roomId, returning null") + return@withContext null + } + val liveTimeline = roomObjects.fullRoom.timeline() RustMatrixRoom( sessionId = sessionId, isKeyBackupEnabled = isKeyBackupEnabled(), - roomListItem = roomListItem, - innerRoom = fullRoom, - innerTimeline = fullRoom.timeline(), - notificationSettingsService = notificationSettingsService, + roomListItem = roomObjects.roomListItem, + innerRoom = roomObjects.fullRoom, + innerTimeline = liveTimeline, sessionCoroutineScope = sessionCoroutineScope, + notificationSettingsService = notificationSettingsService, coroutineDispatchers = dispatchers, systemClock = systemClock, roomContentForwarder = roomContentForwarder, @@ -104,20 +133,28 @@ class RustRoomFactory( } } - private suspend fun pairOfRoom(roomId: RoomId): Pair? { - val cachedRoomListItem = innerRoomListService.roomOrNull(roomId.value) + private suspend fun getRoomObjects(roomId: RoomId): RustRoomObjects? { + cache[roomId]?.let { + Timber.d("Room found in cache for $roomId") + return it + } + val roomListItem = innerRoomListService.roomOrNull(roomId.value) + if (roomListItem == null) { + Timber.d("Room not found for $roomId") + return null + } val fullRoom = try { - cachedRoomListItem?.fullRoomWithTimeline(filter = eventFilters) + roomListItem.fullRoomWithTimeline(filter = eventFilters) } catch (e: RoomListException) { Timber.e(e, "Failed to get full room with timeline for $roomId") - null + return null } - return if (cachedRoomListItem == null || fullRoom == null) { - Timber.d("No room cached for $roomId") - null - } else { - Timber.d("Found room cached for $roomId") - Pair(cachedRoomListItem, fullRoom) + Timber.d("Got full room with timeline for $roomId") + return RustRoomObjects( + roomListItem = roomListItem, + fullRoom = fullRoom, + ).also { + cache.put(roomId, it) } } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt index c8cb4eef1c..d38c86f0f9 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt @@ -26,7 +26,7 @@ import org.matrix.rustcomponents.sdk.TimelineItem class MatrixTimelineItemMapper( private val fetchDetailsForEvent: suspend (EventId) -> Result, - private val roomCoroutineScope: CoroutineScope, + private val coroutineScope: CoroutineScope, private val virtualTimelineItemMapper: VirtualTimelineItemMapper = VirtualTimelineItemMapper(), private val eventTimelineItemMapper: EventTimelineItemMapper = EventTimelineItemMapper(), ) { @@ -49,7 +49,7 @@ class MatrixTimelineItemMapper( return MatrixTimelineItem.Other } - private fun fetchEventDetails(eventId: EventId) = roomCoroutineScope.launch { + private fun fetchEventDetails(eventId: EventId) = coroutineScope.launch { fetchDetailsForEvent(eventId) } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt index dc012f67b3..2adcab5491 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt @@ -25,13 +25,13 @@ import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.PaginationStatusListener -import org.matrix.rustcomponents.sdk.Timeline import org.matrix.rustcomponents.sdk.TimelineDiff +import org.matrix.rustcomponents.sdk.TimelineInterface import org.matrix.rustcomponents.sdk.TimelineListener import timber.log.Timber import uniffi.matrix_sdk_ui.LiveBackPaginationStatus -internal fun Timeline.liveBackPaginationStatus(): Flow = callbackFlow { +internal fun TimelineInterface.liveBackPaginationStatus(): Flow = callbackFlow { val listener = object : PaginationStatusListener { override fun onUpdate(status: LiveBackPaginationStatus) { trySend(status) @@ -45,7 +45,7 @@ internal fun Timeline.liveBackPaginationStatus(): Flow Timber.d(it, "liveBackPaginationStatus() failed") }.buffer(Channel.UNLIMITED) -internal fun Timeline.timelineDiffFlow(): Flow> = +internal fun TimelineInterface.timelineDiffFlow(): Flow> = callbackFlow { val listener = object : TimelineListener { override fun onUpdate(diff: List) { @@ -62,7 +62,7 @@ internal fun Timeline.timelineDiffFlow(): Flow> = Timber.d(it, "timelineDiffFlow() failed") }.buffer(Channel.UNLIMITED) -internal suspend fun Timeline.runWithTimelineListenerRegistered(action: suspend () -> Unit) { +internal suspend fun TimelineInterface.runWithTimelineListenerRegistered(action: suspend () -> Unit) { val result = addListener(NoOpTimelineListener) try { action() diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt index 9f48e59995..fb42f27f62 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt @@ -56,6 +56,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow @@ -64,6 +65,7 @@ import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.getAndUpdate import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch @@ -88,9 +90,9 @@ class RustTimeline( private val inner: InnerTimeline, private val isLive: Boolean, systemClock: SystemClock, - roomCoroutineScope: CoroutineScope, isKeyBackupEnabled: Boolean, private val matrixRoom: MatrixRoom, + private val coroutineScope: CoroutineScope, private val dispatcher: CoroutineDispatcher, lastLoginTimestamp: Date?, private val roomContentForwarder: RoomContentForwarder, @@ -106,7 +108,7 @@ class RustTimeline( private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper) private val timelineItemMapper = MatrixTimelineItemMapper( fetchDetailsForEvent = this::fetchDetailsForEvent, - roomCoroutineScope = roomCoroutineScope, + coroutineScope = coroutineScope, virtualTimelineItemMapper = VirtualTimelineItemMapper(), eventTimelineItemMapper = EventTimelineItemMapper( contentMapper = timelineEventContentMapper @@ -124,7 +126,7 @@ class RustTimeline( ) private val timelineItemsSubscriber = TimelineItemsSubscriber( timeline = inner, - roomCoroutineScope = roomCoroutineScope, + timelineCoroutineScope = coroutineScope, timelineDiffProcessor = timelineDiffProcessor, initLatch = initLatch, isInit = isInit, @@ -145,13 +147,11 @@ class RustTimeline( ) init { - roomCoroutineScope.launch(dispatcher) { - fetchMembers() - if (isLive) { - // When timeline is live, we need to listen to the back pagination status as - // sdk can automatically paginate backwards. - registerBackPaginationStatusListener() - } + coroutineScope.fetchMembers() + if (isLive) { + // When timeline is live, we need to listen to the back pagination status as + // sdk can automatically paginate backwards. + coroutineScope.registerBackPaginationStatusListener() } } @@ -243,9 +243,12 @@ class RustTimeline( } }.onStart { timelineItemsSubscriber.subscribeIfNeeded() + }.onCompletion { + timelineItemsSubscriber.unsubscribeIfNeeded() } override fun close() { + coroutineScope.cancel() inner.close() } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt index 0205ac20e5..a4086c2810 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt @@ -40,10 +40,9 @@ private const val INITIAL_MAX_SIZE = 50 * This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor. * It will also trigger a callback when a new synced event is received. * It will also handle the initial items and make sure they are posted before any diff. - * When closing the room subscription, it will also unsubscribe automatically. */ internal class TimelineItemsSubscriber( - roomCoroutineScope: CoroutineScope, + timelineCoroutineScope: CoroutineScope, dispatcher: CoroutineDispatcher, private val timeline: Timeline, private val timelineDiffProcessor: MatrixTimelineDiffProcessor, @@ -54,8 +53,12 @@ internal class TimelineItemsSubscriber( private var subscriptionCount = 0 private val mutex = Mutex() - private val coroutineScope = roomCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber") + private val coroutineScope = timelineCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber") + /** + * Add a subscription to the timeline and start posting items/diffs to the timelineDiffProcessor. + * It will also trigger a callback when a new synced event is received. + */ suspend fun subscribeIfNeeded() = mutex.withLock { if (subscriptionCount == 0) { timeline.timelineDiffFlow() @@ -70,6 +73,11 @@ internal class TimelineItemsSubscriber( subscriptionCount++ } + /** + * Remove a subscription to the timeline and unsubscribe if needed. + * The timeline will be unsubscribed when the last subscription is removed. + * If the timelineCoroutineScope is cancelled, the timeline will be unsubscribed automatically. + */ suspend fun unsubscribeIfNeeded() = mutex.withLock { when (subscriptionCount) { 0 -> return@withLock From 5e6bcbd7ac9c95565ec89f8a78ecd02c29ea7855 Mon Sep 17 00:00:00 2001 From: ganfra Date: Wed, 17 Jul 2024 15:49:11 +0200 Subject: [PATCH 3/5] Performance : do not trigger back pagination when opening room. --- .../matrix/impl/timeline/RustTimeline.kt | 31 ++++++++++++++----- .../impl/timeline/TimelineItemsSubscriber.kt | 8 ++--- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt index fb42f27f62..fe2f01925d 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt @@ -81,7 +81,6 @@ import timber.log.Timber import uniffi.matrix_sdk_ui.LiveBackPaginationStatus import java.io.File import java.util.Date -import java.util.concurrent.atomic.AtomicBoolean import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline private const val PAGINATION_SIZE = 50 @@ -99,7 +98,7 @@ class RustTimeline( onNewSyncedEvent: () -> Unit, ) : Timeline { private val initLatch = CompletableDeferred() - private val isInit = AtomicBoolean(false) + private val isInit = MutableStateFlow(false) private val _timelineItems: MutableStateFlow> = MutableStateFlow(emptyList()) @@ -208,7 +207,7 @@ class RustTimeline( } private fun canPaginate(direction: Timeline.PaginationDirection): Boolean { - if (!isInit.get()) return false + if (!isInit.value) return false return when (direction) { Timeline.PaginationDirection.BACKWARDS -> backPaginationStatus.value.canPaginate Timeline.PaginationDirection.FORWARDS -> forwardPaginationStatus.value.canPaginate @@ -226,20 +225,25 @@ class RustTimeline( _timelineItems, backPaginationStatus.map { it.hasMoreToLoad }.distinctUntilChanged(), forwardPaginationStatus.map { it.hasMoreToLoad }.distinctUntilChanged(), - ) { timelineItems, hasMoreToLoadBackward, hasMoreToLoadForward -> + isInit, + ) { timelineItems, hasMoreToLoadBackward, hasMoreToLoadForward, isInit -> withContext(dispatcher) { timelineItems - .let { items -> encryptedHistoryPostProcessor.process(items) } - .let { items -> + .process { items -> encryptedHistoryPostProcessor.process(items) } + .process { items -> roomBeginningPostProcessor.process( items = items, isDm = matrixRoom.isDm, hasMoreToLoadBackwards = hasMoreToLoadBackward ) } - .let { items -> loadingIndicatorsPostProcessor.process(items, hasMoreToLoadBackward, hasMoreToLoadForward) } + .process(predicate = isInit) { items -> + loadingIndicatorsPostProcessor.process(items, hasMoreToLoadBackward, hasMoreToLoadForward) + } // Keep lastForwardIndicatorsPostProcessor last - .let { items -> lastForwardIndicatorsPostProcessor.process(items) } + .process(predicate = isInit) { items -> + lastForwardIndicatorsPostProcessor.process(items) + } } }.onStart { timelineItemsSubscriber.subscribeIfNeeded() @@ -545,3 +549,14 @@ class RustTimeline( } } } + +private suspend fun List.process( + predicate: Boolean = true, + processor: suspend (List) -> List +): List { + return if (predicate) { + processor(this) + } else { + this + } +} diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt index a4086c2810..d5454ff254 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt @@ -23,6 +23,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.sync.Mutex @@ -32,7 +33,6 @@ import org.matrix.rustcomponents.sdk.TimelineChange import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem import uniffi.matrix_sdk_ui.EventItemOrigin -import java.util.concurrent.atomic.AtomicBoolean private const val INITIAL_MAX_SIZE = 50 @@ -47,7 +47,7 @@ internal class TimelineItemsSubscriber( private val timeline: Timeline, private val timelineDiffProcessor: MatrixTimelineDiffProcessor, private val initLatch: CompletableDeferred, - private val isInit: AtomicBoolean, + private val isInit: MutableStateFlow, private val onNewSyncedEvent: () -> Unit, ) { private var subscriptionCount = 0 @@ -94,13 +94,13 @@ internal class TimelineItemsSubscriber( ensureActive() timelineDiffProcessor.postItems(it) } - isInit.set(true) + isInit.value = true initLatch.complete(Unit) } private suspend fun postDiffs(diffs: List) { val diffsToProcess = diffs.toMutableList() - if (!isInit.get()) { + if (!isInit.value) { val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET } if (resetDiff != null) { // Keep using the postItems logic so we can post the timelineItems asap. From 7fab94fdd7ac17f9c579cca30a36e5024a84aa40 Mon Sep 17 00:00:00 2001 From: ganfra Date: Wed, 17 Jul 2024 17:22:56 +0200 Subject: [PATCH 4/5] Performance : rename RustRoomObjects to RustRoomReferences --- .../matrix/impl/room/RustRoomFactory.kt | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt index a69cbc67d3..63d2912a42 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt @@ -62,12 +62,12 @@ class RustRoomFactory( private val mutex = Mutex() private var isDestroyed: Boolean = false - private data class RustRoomObjects( + private data class RustRoomReferences( val roomListItem: RoomListItem, val fullRoom: Room, ) - private val cache = lruCache( + private val cache = lruCache( maxSize = CACHE_SIZE, onEntryRemoved = { evicted, roomId, oldRoom, _ -> Timber.d("On room removed from cache: $roomId, evicted: $evicted") @@ -104,22 +104,22 @@ class RustRoomFactory( Timber.d("Room factory is destroyed, returning null for $roomId") return@withContext null } - var roomObjects: RustRoomObjects? = getRoomObjects(roomId) - if (roomObjects == null) { + var roomReferences: RustRoomReferences? = getRoomReferences(roomId) + if (roomReferences == null) { // ... otherwise, lets wait for the SS to load all rooms and check again. roomListService.allRooms.awaitLoaded() - roomObjects = getRoomObjects(roomId) + roomReferences = getRoomReferences(roomId) } - if (roomObjects == null) { + if (roomReferences == null) { Timber.d("No room found for $roomId, returning null") return@withContext null } - val liveTimeline = roomObjects.fullRoom.timeline() + val liveTimeline = roomReferences.fullRoom.timeline() RustMatrixRoom( sessionId = sessionId, isKeyBackupEnabled = isKeyBackupEnabled(), - roomListItem = roomObjects.roomListItem, - innerRoom = roomObjects.fullRoom, + roomListItem = roomReferences.roomListItem, + innerRoom = roomReferences.fullRoom, innerTimeline = liveTimeline, sessionCoroutineScope = sessionCoroutineScope, notificationSettingsService = notificationSettingsService, @@ -133,7 +133,7 @@ class RustRoomFactory( } } - private suspend fun getRoomObjects(roomId: RoomId): RustRoomObjects? { + private suspend fun getRoomReferences(roomId: RoomId): RustRoomReferences? { cache[roomId]?.let { Timber.d("Room found in cache for $roomId") return it @@ -150,7 +150,7 @@ class RustRoomFactory( return null } Timber.d("Got full room with timeline for $roomId") - return RustRoomObjects( + return RustRoomReferences( roomListItem = roomListItem, fullRoom = fullRoom, ).also { From f08d30bb62d769eb71223b918379201f83a6e38d Mon Sep 17 00:00:00 2001 From: ganfra Date: Wed, 17 Jul 2024 17:23:15 +0200 Subject: [PATCH 5/5] Performance : call roomFactory.destroy() --- .../element/android/libraries/matrix/impl/RustMatrixClient.kt | 3 +++ .../android/libraries/matrix/impl/room/RustRoomFactory.kt | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt index 13eb506b0a..ad37b0c917 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt @@ -490,6 +490,9 @@ class RustMatrixClient( override fun roomDirectoryService(): RoomDirectoryService = roomDirectoryService override fun close() { + appCoroutineScope.launch { + roomFactory.destroy() + } sessionCoroutineScope.cancel() clientDelegateTaskHandle?.cancelAndDestroy() notificationSettingsService.destroy() diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt index 63d2912a42..b559aacf86 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt @@ -31,6 +31,7 @@ import io.element.android.libraries.sessionstorage.api.SessionData import io.element.android.services.toolbox.api.systemclock.SystemClock import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext @@ -89,7 +90,7 @@ class RustRoomFactory( } suspend fun destroy() { - withContext(dispatcher) { + withContext(NonCancellable + dispatcher) { mutex.withLock { Timber.d("Destroying room factory") cache.evictAll()