diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt index 13eb506b0a..ad37b0c917 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt @@ -490,6 +490,9 @@ class RustMatrixClient( override fun roomDirectoryService(): RoomDirectoryService = roomDirectoryService override fun close() { + appCoroutineScope.launch { + roomFactory.destroy() + } sessionCoroutineScope.cancel() clientDelegateTaskHandle?.cancelAndDestroy() notificationSettingsService.destroy() diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt index 463defb284..d23e5efb2a 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt @@ -196,8 +196,6 @@ class RustMatrixRoom( override fun destroy() { roomCoroutineScope.cancel() liveTimeline.close() - innerRoom.destroy() - roomListItem.destroy() } override val displayName: String @@ -627,12 +625,13 @@ class RustMatrixRoom( isLive: Boolean, onNewSyncedEvent: () -> Unit = {}, ): Timeline { + val timelineCoroutineScope = roomCoroutineScope.childScope(coroutineDispatchers.main, "TimelineScope-$roomId-$timeline") return RustTimeline( isKeyBackupEnabled = isKeyBackupEnabled, isLive = isLive, matrixRoom = this, systemClock = systemClock, - roomCoroutineScope = roomCoroutineScope, + coroutineScope = timelineCoroutineScope, dispatcher = roomDispatcher, lastLoginTimestamp = sessionData.loginTimestamp, onNewSyncedEvent = onNewSyncedEvent, diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt index 8d3d056aab..b559aacf86 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt @@ -16,6 +16,7 @@ package io.element.android.libraries.matrix.impl.room +import androidx.collection.lruCache import io.element.android.appconfig.TimelineConfig import io.element.android.libraries.core.coroutine.CoroutineDispatchers import io.element.android.libraries.matrix.api.core.RoomId @@ -30,6 +31,7 @@ import io.element.android.libraries.sessionstorage.api.SessionData import io.element.android.services.toolbox.api.systemclock.SystemClock import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext @@ -41,6 +43,8 @@ import org.matrix.rustcomponents.sdk.TimelineEventTypeFilter import timber.log.Timber import org.matrix.rustcomponents.sdk.RoomListService as InnerRoomListService +private const val CACHE_SIZE = 16 + class RustRoomFactory( private val sessionId: SessionId, private val notificationSettingsService: NotificationSettingsService, @@ -55,8 +59,23 @@ class RustRoomFactory( private val getSessionData: suspend () -> SessionData, ) { @OptIn(ExperimentalCoroutinesApi::class) - private val createRoomDispatcher = dispatchers.io.limitedParallelism(1) + private val dispatcher = dispatchers.io.limitedParallelism(1) private val mutex = Mutex() + private var isDestroyed: Boolean = false + + private data class RustRoomReferences( + val roomListItem: RoomListItem, + val fullRoom: Room, + ) + + private val cache = lruCache( + maxSize = CACHE_SIZE, + onEntryRemoved = { evicted, roomId, oldRoom, _ -> + Timber.d("On room removed from cache: $roomId, evicted: $evicted") + oldRoom.roomListItem.close() + oldRoom.fullRoom.close() + } + ) private val matrixRoomInfoMapper = MatrixRoomInfoMapper() @@ -70,30 +89,41 @@ class RustRoomFactory( ) } - suspend fun create(roomId: RoomId): MatrixRoom? = withContext(createRoomDispatcher) { - var cachedPairOfRoom: Pair? - mutex.withLock { - // Check if already in memory... - cachedPairOfRoom = pairOfRoom(roomId) - if (cachedPairOfRoom == null) { - // ... otherwise, lets wait for the SS to load all rooms and check again. - roomListService.allRooms.awaitLoaded() - cachedPairOfRoom = pairOfRoom(roomId) + suspend fun destroy() { + withContext(NonCancellable + dispatcher) { + mutex.withLock { + Timber.d("Destroying room factory") + cache.evictAll() + isDestroyed = true } } - if (cachedPairOfRoom == null) { - Timber.d("No room found for $roomId") - return@withContext null - } - cachedPairOfRoom?.let { (roomListItem, fullRoom) -> + } + + suspend fun create(roomId: RoomId): MatrixRoom? = withContext(dispatcher) { + mutex.withLock { + if (isDestroyed) { + Timber.d("Room factory is destroyed, returning null for $roomId") + return@withContext null + } + var roomReferences: RustRoomReferences? = getRoomReferences(roomId) + if (roomReferences == null) { + // ... otherwise, lets wait for the SS to load all rooms and check again. + roomListService.allRooms.awaitLoaded() + roomReferences = getRoomReferences(roomId) + } + if (roomReferences == null) { + Timber.d("No room found for $roomId, returning null") + return@withContext null + } + val liveTimeline = roomReferences.fullRoom.timeline() RustMatrixRoom( sessionId = sessionId, isKeyBackupEnabled = isKeyBackupEnabled(), - roomListItem = roomListItem, - innerRoom = fullRoom, - innerTimeline = fullRoom.timeline(), - notificationSettingsService = notificationSettingsService, + roomListItem = roomReferences.roomListItem, + innerRoom = roomReferences.fullRoom, + innerTimeline = liveTimeline, sessionCoroutineScope = sessionCoroutineScope, + notificationSettingsService = notificationSettingsService, coroutineDispatchers = dispatchers, systemClock = systemClock, roomContentForwarder = roomContentForwarder, @@ -104,20 +134,28 @@ class RustRoomFactory( } } - private suspend fun pairOfRoom(roomId: RoomId): Pair? { - val cachedRoomListItem = innerRoomListService.roomOrNull(roomId.value) + private suspend fun getRoomReferences(roomId: RoomId): RustRoomReferences? { + cache[roomId]?.let { + Timber.d("Room found in cache for $roomId") + return it + } + val roomListItem = innerRoomListService.roomOrNull(roomId.value) + if (roomListItem == null) { + Timber.d("Room not found for $roomId") + return null + } val fullRoom = try { - cachedRoomListItem?.fullRoomWithTimeline(filter = eventFilters) + roomListItem.fullRoomWithTimeline(filter = eventFilters) } catch (e: RoomListException) { Timber.e(e, "Failed to get full room with timeline for $roomId") - null + return null } - return if (cachedRoomListItem == null || fullRoom == null) { - Timber.d("No room cached for $roomId") - null - } else { - Timber.d("Found room cached for $roomId") - Pair(cachedRoomListItem, fullRoom) + Timber.d("Got full room with timeline for $roomId") + return RustRoomReferences( + roomListItem = roomListItem, + fullRoom = fullRoom, + ).also { + cache.put(roomId, it) } } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt index c8cb4eef1c..d38c86f0f9 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt @@ -26,7 +26,7 @@ import org.matrix.rustcomponents.sdk.TimelineItem class MatrixTimelineItemMapper( private val fetchDetailsForEvent: suspend (EventId) -> Result, - private val roomCoroutineScope: CoroutineScope, + private val coroutineScope: CoroutineScope, private val virtualTimelineItemMapper: VirtualTimelineItemMapper = VirtualTimelineItemMapper(), private val eventTimelineItemMapper: EventTimelineItemMapper = EventTimelineItemMapper(), ) { @@ -49,7 +49,7 @@ class MatrixTimelineItemMapper( return MatrixTimelineItem.Other } - private fun fetchEventDetails(eventId: EventId) = roomCoroutineScope.launch { + private fun fetchEventDetails(eventId: EventId) = coroutineScope.launch { fetchDetailsForEvent(eventId) } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt index dc012f67b3..2adcab5491 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt @@ -25,13 +25,13 @@ import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.PaginationStatusListener -import org.matrix.rustcomponents.sdk.Timeline import org.matrix.rustcomponents.sdk.TimelineDiff +import org.matrix.rustcomponents.sdk.TimelineInterface import org.matrix.rustcomponents.sdk.TimelineListener import timber.log.Timber import uniffi.matrix_sdk_ui.LiveBackPaginationStatus -internal fun Timeline.liveBackPaginationStatus(): Flow = callbackFlow { +internal fun TimelineInterface.liveBackPaginationStatus(): Flow = callbackFlow { val listener = object : PaginationStatusListener { override fun onUpdate(status: LiveBackPaginationStatus) { trySend(status) @@ -45,7 +45,7 @@ internal fun Timeline.liveBackPaginationStatus(): Flow Timber.d(it, "liveBackPaginationStatus() failed") }.buffer(Channel.UNLIMITED) -internal fun Timeline.timelineDiffFlow(): Flow> = +internal fun TimelineInterface.timelineDiffFlow(): Flow> = callbackFlow { val listener = object : TimelineListener { override fun onUpdate(diff: List) { @@ -62,7 +62,7 @@ internal fun Timeline.timelineDiffFlow(): Flow> = Timber.d(it, "timelineDiffFlow() failed") }.buffer(Channel.UNLIMITED) -internal suspend fun Timeline.runWithTimelineListenerRegistered(action: suspend () -> Unit) { +internal suspend fun TimelineInterface.runWithTimelineListenerRegistered(action: suspend () -> Unit) { val result = addListener(NoOpTimelineListener) try { action() diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt index 23ea2a222d..00c79bf687 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt @@ -56,8 +56,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow @@ -66,75 +65,78 @@ import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.getAndUpdate import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.FormattedBody import org.matrix.rustcomponents.sdk.MessageFormat import org.matrix.rustcomponents.sdk.RoomMessageEventContentWithoutRelation import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle -import org.matrix.rustcomponents.sdk.TimelineChange -import org.matrix.rustcomponents.sdk.TimelineDiff -import org.matrix.rustcomponents.sdk.TimelineItem import org.matrix.rustcomponents.sdk.messageEventContentFromHtml import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown import org.matrix.rustcomponents.sdk.use import timber.log.Timber -import uniffi.matrix_sdk_ui.EventItemOrigin import uniffi.matrix_sdk_ui.LiveBackPaginationStatus import java.io.File import java.util.Date -import java.util.concurrent.atomic.AtomicBoolean import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline -private const val INITIAL_MAX_SIZE = 50 private const val PAGINATION_SIZE = 50 class RustTimeline( private val inner: InnerTimeline, - isLive: Boolean, + private val isLive: Boolean, systemClock: SystemClock, - roomCoroutineScope: CoroutineScope, isKeyBackupEnabled: Boolean, private val matrixRoom: MatrixRoom, + private val coroutineScope: CoroutineScope, private val dispatcher: CoroutineDispatcher, lastLoginTimestamp: Date?, private val roomContentForwarder: RoomContentForwarder, - private val onNewSyncedEvent: () -> Unit, + onNewSyncedEvent: () -> Unit, ) : Timeline { private val initLatch = CompletableDeferred() - private val isInit = AtomicBoolean(false) + private val isInit = MutableStateFlow(false) private val _timelineItems: MutableStateFlow> = MutableStateFlow(emptyList()) + private val timelineEventContentMapper = TimelineEventContentMapper() + private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper) + private val timelineItemMapper = MatrixTimelineItemMapper( + fetchDetailsForEvent = this::fetchDetailsForEvent, + coroutineScope = coroutineScope, + virtualTimelineItemMapper = VirtualTimelineItemMapper(), + eventTimelineItemMapper = EventTimelineItemMapper( + contentMapper = timelineEventContentMapper + ) + ) + private val timelineDiffProcessor = MatrixTimelineDiffProcessor( + timelineItems = _timelineItems, + timelineItemFactory = timelineItemMapper, + ) private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor( lastLoginTimestamp = lastLoginTimestamp, isRoomEncrypted = matrixRoom.isEncrypted, isKeyBackupEnabled = isKeyBackupEnabled, dispatcher = dispatcher, ) + private val timelineItemsSubscriber = TimelineItemsSubscriber( + timeline = inner, + timelineCoroutineScope = coroutineScope, + timelineDiffProcessor = timelineDiffProcessor, + initLatch = initLatch, + isInit = isInit, + dispatcher = dispatcher, + onNewSyncedEvent = onNewSyncedEvent, + ) private val roomBeginningPostProcessor = RoomBeginningPostProcessor() private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock) private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive) - private val timelineEventContentMapper = TimelineEventContentMapper() - private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper) - private val timelineItemFactory = MatrixTimelineItemMapper( - fetchDetailsForEvent = this::fetchDetailsForEvent, - roomCoroutineScope = roomCoroutineScope, - virtualTimelineItemMapper = VirtualTimelineItemMapper(), - eventTimelineItemMapper = EventTimelineItemMapper( - contentMapper = timelineEventContentMapper - ) - ) - - private val timelineDiffProcessor = MatrixTimelineDiffProcessor( - timelineItems = _timelineItems, - timelineItemFactory = timelineItemFactory, - ) - private val backPaginationStatus = MutableStateFlow( Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true) ) @@ -144,37 +146,27 @@ class RustTimeline( ) init { - roomCoroutineScope.launch(dispatcher) { - inner.timelineDiffFlow() - .onEach { diffs -> - if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) { - onNewSyncedEvent() - } - postDiffs(diffs) - } - .launchIn(this) - - launch { - fetchMembers() - } - - if (isLive) { - // When timeline is live, we need to listen to the back pagination status as - // sdk can automatically paginate backwards. - inner.liveBackPaginationStatus() - .onEach { backPaginationStatus -> - updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) { - when (backPaginationStatus) { - is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline) - is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true) - } - } - } - .launchIn(this) - } + coroutineScope.fetchMembers() + if (isLive) { + // When timeline is live, we need to listen to the back pagination status as + // sdk can automatically paginate backwards. + coroutineScope.registerBackPaginationStatusListener() } } + private fun CoroutineScope.registerBackPaginationStatusListener() { + inner.liveBackPaginationStatus() + .onEach { backPaginationStatus -> + updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) { + when (backPaginationStatus) { + is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline) + is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true) + } + } + } + .launchIn(this) + } + override val membershipChangeEventReceived: Flow = timelineDiffProcessor.membershipChangeEventReceived override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result { @@ -215,7 +207,7 @@ class RustTimeline( } private fun canPaginate(direction: Timeline.PaginationDirection): Boolean { - if (!isInit.get()) return false + if (!isInit.value) return false return when (direction) { Timeline.PaginationDirection.BACKWARDS -> backPaginationStatus.value.canPaginate Timeline.PaginationDirection.FORWARDS -> forwardPaginationStatus.value.canPaginate @@ -233,28 +225,38 @@ class RustTimeline( _timelineItems, backPaginationStatus.map { it.hasMoreToLoad }.distinctUntilChanged(), forwardPaginationStatus.map { it.hasMoreToLoad }.distinctUntilChanged(), - ) { timelineItems, hasMoreToLoadBackward, hasMoreToLoadForward -> + isInit, + ) { timelineItems, hasMoreToLoadBackward, hasMoreToLoadForward, isInit -> withContext(dispatcher) { timelineItems - .let { items -> encryptedHistoryPostProcessor.process(items) } - .let { items -> + .process { items -> encryptedHistoryPostProcessor.process(items) } + .process { items -> roomBeginningPostProcessor.process( items = items, isDm = matrixRoom.isDm, hasMoreToLoadBackwards = hasMoreToLoadBackward ) } - .let { items -> loadingIndicatorsPostProcessor.process(items, hasMoreToLoadBackward, hasMoreToLoadForward) } + .process(predicate = isInit) { items -> + loadingIndicatorsPostProcessor.process(items, hasMoreToLoadBackward, hasMoreToLoadForward) + } // Keep lastForwardIndicatorsPostProcessor last - .let { items -> lastForwardIndicatorsPostProcessor.process(items) } + .process(predicate = isInit) { items -> + lastForwardIndicatorsPostProcessor.process(items) + } } + }.onStart { + timelineItemsSubscriber.subscribeIfNeeded() + }.onCompletion { + timelineItemsSubscriber.unsubscribeIfNeeded() } override fun close() { + coroutineScope.cancel() inner.close() } - private suspend fun fetchMembers() = withContext(dispatcher) { + private fun CoroutineScope.fetchMembers() = launch(dispatcher) { initLatch.await() try { inner.fetchMembers() @@ -263,32 +265,6 @@ class RustTimeline( } } - private suspend fun postItems(items: List) = coroutineScope { - // Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap. - items.chunked(INITIAL_MAX_SIZE).reversed().forEach { - ensureActive() - timelineDiffProcessor.postItems(it) - } - isInit.set(true) - initLatch.complete(Unit) - } - - private suspend fun postDiffs(diffs: List) { - val diffsToProcess = diffs.toMutableList() - if (!isInit.get()) { - val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET } - if (resetDiff != null) { - // Keep using the postItems logic so we can post the timelineItems asap. - postItems(resetDiff.reset() ?: emptyList()) - diffsToProcess.remove(resetDiff) - } - } - initLatch.await() - if (diffsToProcess.isNotEmpty()) { - timelineDiffProcessor.postDiffs(diffsToProcess) - } - } - override suspend fun sendMessage(body: String, htmlBody: String?, mentions: List): Result = withContext(dispatcher) { messageEventContentFromParts(body, htmlBody).withMentions(mentions.map()).use { content -> runCatching { @@ -554,12 +530,6 @@ class RustTimeline( } } - private suspend fun fetchDetailsForEvent(eventId: EventId): Result { - return runCatching { - inner.fetchDetailsForEvent(eventId.value) - } - } - override suspend fun loadReplyDetails(eventId: EventId): InReplyTo = withContext(dispatcher) { val timelineItem = _timelineItems.value.firstOrNull { timelineItem -> timelineItem is MatrixTimelineItem.Event && timelineItem.eventId == eventId @@ -576,4 +546,21 @@ class RustTimeline( inner.loadReplyDetails(eventId.value).use(inReplyToMapper::map) } } + + private suspend fun fetchDetailsForEvent(eventId: EventId): Result { + return runCatching { + inner.fetchDetailsForEvent(eventId.value) + } + } +} + +private suspend fun List.process( + predicate: Boolean = true, + processor: suspend (List) -> List +): List { + return if (predicate) { + processor(this) + } else { + this + } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt new file mode 100644 index 0000000000..d5454ff254 --- /dev/null +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2024 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.element.android.libraries.matrix.impl.timeline + +import io.element.android.libraries.core.coroutine.childScope +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.matrix.rustcomponents.sdk.Timeline +import org.matrix.rustcomponents.sdk.TimelineChange +import org.matrix.rustcomponents.sdk.TimelineDiff +import org.matrix.rustcomponents.sdk.TimelineItem +import uniffi.matrix_sdk_ui.EventItemOrigin + +private const val INITIAL_MAX_SIZE = 50 + +/** + * This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor. + * It will also trigger a callback when a new synced event is received. + * It will also handle the initial items and make sure they are posted before any diff. + */ +internal class TimelineItemsSubscriber( + timelineCoroutineScope: CoroutineScope, + dispatcher: CoroutineDispatcher, + private val timeline: Timeline, + private val timelineDiffProcessor: MatrixTimelineDiffProcessor, + private val initLatch: CompletableDeferred, + private val isInit: MutableStateFlow, + private val onNewSyncedEvent: () -> Unit, +) { + private var subscriptionCount = 0 + private val mutex = Mutex() + + private val coroutineScope = timelineCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber") + + /** + * Add a subscription to the timeline and start posting items/diffs to the timelineDiffProcessor. + * It will also trigger a callback when a new synced event is received. + */ + suspend fun subscribeIfNeeded() = mutex.withLock { + if (subscriptionCount == 0) { + timeline.timelineDiffFlow() + .onEach { diffs -> + if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) { + onNewSyncedEvent() + } + postDiffs(diffs) + } + .launchIn(coroutineScope) + } + subscriptionCount++ + } + + /** + * Remove a subscription to the timeline and unsubscribe if needed. + * The timeline will be unsubscribed when the last subscription is removed. + * If the timelineCoroutineScope is cancelled, the timeline will be unsubscribed automatically. + */ + suspend fun unsubscribeIfNeeded() = mutex.withLock { + when (subscriptionCount) { + 0 -> return@withLock + 1 -> { + coroutineScope.coroutineContext.cancelChildren() + } + } + subscriptionCount-- + } + + private suspend fun postItems(items: List) = coroutineScope { + // Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap. + items.chunked(INITIAL_MAX_SIZE).reversed().forEach { + ensureActive() + timelineDiffProcessor.postItems(it) + } + isInit.value = true + initLatch.complete(Unit) + } + + private suspend fun postDiffs(diffs: List) { + val diffsToProcess = diffs.toMutableList() + if (!isInit.value) { + val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET } + if (resetDiff != null) { + // Keep using the postItems logic so we can post the timelineItems asap. + postItems(resetDiff.reset() ?: emptyList()) + diffsToProcess.remove(resetDiff) + } + } + initLatch.await() + if (diffsToProcess.isNotEmpty()) { + timelineDiffProcessor.postDiffs(diffsToProcess) + } + } +}