Merge pull request #3186 from element-hq/feature/fga/investigate_performance_issue

Performance : improve time to open a room.
This commit is contained in:
ganfra 2024-07-17 19:18:03 +02:00 committed by GitHub
commit a1987168ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 276 additions and 133 deletions

View file

@ -490,6 +490,9 @@ class RustMatrixClient(
override fun roomDirectoryService(): RoomDirectoryService = roomDirectoryService
override fun close() {
appCoroutineScope.launch {
roomFactory.destroy()
}
sessionCoroutineScope.cancel()
clientDelegateTaskHandle?.cancelAndDestroy()
notificationSettingsService.destroy()

View file

@ -196,8 +196,6 @@ class RustMatrixRoom(
override fun destroy() {
roomCoroutineScope.cancel()
liveTimeline.close()
innerRoom.destroy()
roomListItem.destroy()
}
override val displayName: String
@ -627,12 +625,13 @@ class RustMatrixRoom(
isLive: Boolean,
onNewSyncedEvent: () -> Unit = {},
): Timeline {
val timelineCoroutineScope = roomCoroutineScope.childScope(coroutineDispatchers.main, "TimelineScope-$roomId-$timeline")
return RustTimeline(
isKeyBackupEnabled = isKeyBackupEnabled,
isLive = isLive,
matrixRoom = this,
systemClock = systemClock,
roomCoroutineScope = roomCoroutineScope,
coroutineScope = timelineCoroutineScope,
dispatcher = roomDispatcher,
lastLoginTimestamp = sessionData.loginTimestamp,
onNewSyncedEvent = onNewSyncedEvent,

View file

@ -16,6 +16,7 @@
package io.element.android.libraries.matrix.impl.room
import androidx.collection.lruCache
import io.element.android.appconfig.TimelineConfig
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.matrix.api.core.RoomId
@ -30,6 +31,7 @@ import io.element.android.libraries.sessionstorage.api.SessionData
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
@ -41,6 +43,8 @@ import org.matrix.rustcomponents.sdk.TimelineEventTypeFilter
import timber.log.Timber
import org.matrix.rustcomponents.sdk.RoomListService as InnerRoomListService
private const val CACHE_SIZE = 16
class RustRoomFactory(
private val sessionId: SessionId,
private val notificationSettingsService: NotificationSettingsService,
@ -55,8 +59,23 @@ class RustRoomFactory(
private val getSessionData: suspend () -> SessionData,
) {
@OptIn(ExperimentalCoroutinesApi::class)
private val createRoomDispatcher = dispatchers.io.limitedParallelism(1)
private val dispatcher = dispatchers.io.limitedParallelism(1)
private val mutex = Mutex()
private var isDestroyed: Boolean = false
private data class RustRoomReferences(
val roomListItem: RoomListItem,
val fullRoom: Room,
)
private val cache = lruCache<RoomId, RustRoomReferences>(
maxSize = CACHE_SIZE,
onEntryRemoved = { evicted, roomId, oldRoom, _ ->
Timber.d("On room removed from cache: $roomId, evicted: $evicted")
oldRoom.roomListItem.close()
oldRoom.fullRoom.close()
}
)
private val matrixRoomInfoMapper = MatrixRoomInfoMapper()
@ -70,30 +89,41 @@ class RustRoomFactory(
)
}
suspend fun create(roomId: RoomId): MatrixRoom? = withContext(createRoomDispatcher) {
var cachedPairOfRoom: Pair<RoomListItem, Room>?
mutex.withLock {
// Check if already in memory...
cachedPairOfRoom = pairOfRoom(roomId)
if (cachedPairOfRoom == null) {
// ... otherwise, lets wait for the SS to load all rooms and check again.
roomListService.allRooms.awaitLoaded()
cachedPairOfRoom = pairOfRoom(roomId)
suspend fun destroy() {
withContext(NonCancellable + dispatcher) {
mutex.withLock {
Timber.d("Destroying room factory")
cache.evictAll()
isDestroyed = true
}
}
if (cachedPairOfRoom == null) {
Timber.d("No room found for $roomId")
return@withContext null
}
cachedPairOfRoom?.let { (roomListItem, fullRoom) ->
}
suspend fun create(roomId: RoomId): MatrixRoom? = withContext(dispatcher) {
mutex.withLock {
if (isDestroyed) {
Timber.d("Room factory is destroyed, returning null for $roomId")
return@withContext null
}
var roomReferences: RustRoomReferences? = getRoomReferences(roomId)
if (roomReferences == null) {
// ... otherwise, lets wait for the SS to load all rooms and check again.
roomListService.allRooms.awaitLoaded()
roomReferences = getRoomReferences(roomId)
}
if (roomReferences == null) {
Timber.d("No room found for $roomId, returning null")
return@withContext null
}
val liveTimeline = roomReferences.fullRoom.timeline()
RustMatrixRoom(
sessionId = sessionId,
isKeyBackupEnabled = isKeyBackupEnabled(),
roomListItem = roomListItem,
innerRoom = fullRoom,
innerTimeline = fullRoom.timeline(),
notificationSettingsService = notificationSettingsService,
roomListItem = roomReferences.roomListItem,
innerRoom = roomReferences.fullRoom,
innerTimeline = liveTimeline,
sessionCoroutineScope = sessionCoroutineScope,
notificationSettingsService = notificationSettingsService,
coroutineDispatchers = dispatchers,
systemClock = systemClock,
roomContentForwarder = roomContentForwarder,
@ -104,20 +134,28 @@ class RustRoomFactory(
}
}
private suspend fun pairOfRoom(roomId: RoomId): Pair<RoomListItem, Room>? {
val cachedRoomListItem = innerRoomListService.roomOrNull(roomId.value)
private suspend fun getRoomReferences(roomId: RoomId): RustRoomReferences? {
cache[roomId]?.let {
Timber.d("Room found in cache for $roomId")
return it
}
val roomListItem = innerRoomListService.roomOrNull(roomId.value)
if (roomListItem == null) {
Timber.d("Room not found for $roomId")
return null
}
val fullRoom = try {
cachedRoomListItem?.fullRoomWithTimeline(filter = eventFilters)
roomListItem.fullRoomWithTimeline(filter = eventFilters)
} catch (e: RoomListException) {
Timber.e(e, "Failed to get full room with timeline for $roomId")
null
return null
}
return if (cachedRoomListItem == null || fullRoom == null) {
Timber.d("No room cached for $roomId")
null
} else {
Timber.d("Found room cached for $roomId")
Pair(cachedRoomListItem, fullRoom)
Timber.d("Got full room with timeline for $roomId")
return RustRoomReferences(
roomListItem = roomListItem,
fullRoom = fullRoom,
).also {
cache.put(roomId, it)
}
}
}

View file

@ -26,7 +26,7 @@ import org.matrix.rustcomponents.sdk.TimelineItem
class MatrixTimelineItemMapper(
private val fetchDetailsForEvent: suspend (EventId) -> Result<Unit>,
private val roomCoroutineScope: CoroutineScope,
private val coroutineScope: CoroutineScope,
private val virtualTimelineItemMapper: VirtualTimelineItemMapper = VirtualTimelineItemMapper(),
private val eventTimelineItemMapper: EventTimelineItemMapper = EventTimelineItemMapper(),
) {
@ -49,7 +49,7 @@ class MatrixTimelineItemMapper(
return MatrixTimelineItem.Other
}
private fun fetchEventDetails(eventId: EventId) = roomCoroutineScope.launch {
private fun fetchEventDetails(eventId: EventId) = coroutineScope.launch {
fetchDetailsForEvent(eventId)
}
}

View file

@ -25,13 +25,13 @@ import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import org.matrix.rustcomponents.sdk.PaginationStatusListener
import org.matrix.rustcomponents.sdk.Timeline
import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineInterface
import org.matrix.rustcomponents.sdk.TimelineListener
import timber.log.Timber
import uniffi.matrix_sdk_ui.LiveBackPaginationStatus
internal fun Timeline.liveBackPaginationStatus(): Flow<LiveBackPaginationStatus> = callbackFlow {
internal fun TimelineInterface.liveBackPaginationStatus(): Flow<LiveBackPaginationStatus> = callbackFlow {
val listener = object : PaginationStatusListener {
override fun onUpdate(status: LiveBackPaginationStatus) {
trySend(status)
@ -45,7 +45,7 @@ internal fun Timeline.liveBackPaginationStatus(): Flow<LiveBackPaginationStatus>
Timber.d(it, "liveBackPaginationStatus() failed")
}.buffer(Channel.UNLIMITED)
internal fun Timeline.timelineDiffFlow(): Flow<List<TimelineDiff>> =
internal fun TimelineInterface.timelineDiffFlow(): Flow<List<TimelineDiff>> =
callbackFlow {
val listener = object : TimelineListener {
override fun onUpdate(diff: List<TimelineDiff>) {
@ -62,7 +62,7 @@ internal fun Timeline.timelineDiffFlow(): Flow<List<TimelineDiff>> =
Timber.d(it, "timelineDiffFlow() failed")
}.buffer(Channel.UNLIMITED)
internal suspend fun Timeline.runWithTimelineListenerRegistered(action: suspend () -> Unit) {
internal suspend fun TimelineInterface.runWithTimelineListenerRegistered(action: suspend () -> Unit) {
val result = addListener(NoOpTimelineListener)
try {
action()

View file

@ -56,8 +56,7 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
@ -66,75 +65,78 @@ import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.getAndUpdate
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.FormattedBody
import org.matrix.rustcomponents.sdk.MessageFormat
import org.matrix.rustcomponents.sdk.RoomMessageEventContentWithoutRelation
import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle
import org.matrix.rustcomponents.sdk.TimelineChange
import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineItem
import org.matrix.rustcomponents.sdk.messageEventContentFromHtml
import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
import org.matrix.rustcomponents.sdk.use
import timber.log.Timber
import uniffi.matrix_sdk_ui.EventItemOrigin
import uniffi.matrix_sdk_ui.LiveBackPaginationStatus
import java.io.File
import java.util.Date
import java.util.concurrent.atomic.AtomicBoolean
import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline
private const val INITIAL_MAX_SIZE = 50
private const val PAGINATION_SIZE = 50
class RustTimeline(
private val inner: InnerTimeline,
isLive: Boolean,
private val isLive: Boolean,
systemClock: SystemClock,
roomCoroutineScope: CoroutineScope,
isKeyBackupEnabled: Boolean,
private val matrixRoom: MatrixRoom,
private val coroutineScope: CoroutineScope,
private val dispatcher: CoroutineDispatcher,
lastLoginTimestamp: Date?,
private val roomContentForwarder: RoomContentForwarder,
private val onNewSyncedEvent: () -> Unit,
onNewSyncedEvent: () -> Unit,
) : Timeline {
private val initLatch = CompletableDeferred<Unit>()
private val isInit = AtomicBoolean(false)
private val isInit = MutableStateFlow(false)
private val _timelineItems: MutableStateFlow<List<MatrixTimelineItem>> =
MutableStateFlow(emptyList())
private val timelineEventContentMapper = TimelineEventContentMapper()
private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper)
private val timelineItemMapper = MatrixTimelineItemMapper(
fetchDetailsForEvent = this::fetchDetailsForEvent,
coroutineScope = coroutineScope,
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
eventTimelineItemMapper = EventTimelineItemMapper(
contentMapper = timelineEventContentMapper
)
)
private val timelineDiffProcessor = MatrixTimelineDiffProcessor(
timelineItems = _timelineItems,
timelineItemFactory = timelineItemMapper,
)
private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor(
lastLoginTimestamp = lastLoginTimestamp,
isRoomEncrypted = matrixRoom.isEncrypted,
isKeyBackupEnabled = isKeyBackupEnabled,
dispatcher = dispatcher,
)
private val timelineItemsSubscriber = TimelineItemsSubscriber(
timeline = inner,
timelineCoroutineScope = coroutineScope,
timelineDiffProcessor = timelineDiffProcessor,
initLatch = initLatch,
isInit = isInit,
dispatcher = dispatcher,
onNewSyncedEvent = onNewSyncedEvent,
)
private val roomBeginningPostProcessor = RoomBeginningPostProcessor()
private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock)
private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive)
private val timelineEventContentMapper = TimelineEventContentMapper()
private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper)
private val timelineItemFactory = MatrixTimelineItemMapper(
fetchDetailsForEvent = this::fetchDetailsForEvent,
roomCoroutineScope = roomCoroutineScope,
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
eventTimelineItemMapper = EventTimelineItemMapper(
contentMapper = timelineEventContentMapper
)
)
private val timelineDiffProcessor = MatrixTimelineDiffProcessor(
timelineItems = _timelineItems,
timelineItemFactory = timelineItemFactory,
)
private val backPaginationStatus = MutableStateFlow(
Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true)
)
@ -144,37 +146,27 @@ class RustTimeline(
)
init {
roomCoroutineScope.launch(dispatcher) {
inner.timelineDiffFlow()
.onEach { diffs ->
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
onNewSyncedEvent()
}
postDiffs(diffs)
}
.launchIn(this)
launch {
fetchMembers()
}
if (isLive) {
// When timeline is live, we need to listen to the back pagination status as
// sdk can automatically paginate backwards.
inner.liveBackPaginationStatus()
.onEach { backPaginationStatus ->
updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) {
when (backPaginationStatus) {
is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline)
is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true)
}
}
}
.launchIn(this)
}
coroutineScope.fetchMembers()
if (isLive) {
// When timeline is live, we need to listen to the back pagination status as
// sdk can automatically paginate backwards.
coroutineScope.registerBackPaginationStatusListener()
}
}
private fun CoroutineScope.registerBackPaginationStatusListener() {
inner.liveBackPaginationStatus()
.onEach { backPaginationStatus ->
updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) {
when (backPaginationStatus) {
is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline)
is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true)
}
}
}
.launchIn(this)
}
override val membershipChangeEventReceived: Flow<Unit> = timelineDiffProcessor.membershipChangeEventReceived
override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result<Unit> {
@ -215,7 +207,7 @@ class RustTimeline(
}
private fun canPaginate(direction: Timeline.PaginationDirection): Boolean {
if (!isInit.get()) return false
if (!isInit.value) return false
return when (direction) {
Timeline.PaginationDirection.BACKWARDS -> backPaginationStatus.value.canPaginate
Timeline.PaginationDirection.FORWARDS -> forwardPaginationStatus.value.canPaginate
@ -233,28 +225,38 @@ class RustTimeline(
_timelineItems,
backPaginationStatus.map { it.hasMoreToLoad }.distinctUntilChanged(),
forwardPaginationStatus.map { it.hasMoreToLoad }.distinctUntilChanged(),
) { timelineItems, hasMoreToLoadBackward, hasMoreToLoadForward ->
isInit,
) { timelineItems, hasMoreToLoadBackward, hasMoreToLoadForward, isInit ->
withContext(dispatcher) {
timelineItems
.let { items -> encryptedHistoryPostProcessor.process(items) }
.let { items ->
.process { items -> encryptedHistoryPostProcessor.process(items) }
.process { items ->
roomBeginningPostProcessor.process(
items = items,
isDm = matrixRoom.isDm,
hasMoreToLoadBackwards = hasMoreToLoadBackward
)
}
.let { items -> loadingIndicatorsPostProcessor.process(items, hasMoreToLoadBackward, hasMoreToLoadForward) }
.process(predicate = isInit) { items ->
loadingIndicatorsPostProcessor.process(items, hasMoreToLoadBackward, hasMoreToLoadForward)
}
// Keep lastForwardIndicatorsPostProcessor last
.let { items -> lastForwardIndicatorsPostProcessor.process(items) }
.process(predicate = isInit) { items ->
lastForwardIndicatorsPostProcessor.process(items)
}
}
}.onStart {
timelineItemsSubscriber.subscribeIfNeeded()
}.onCompletion {
timelineItemsSubscriber.unsubscribeIfNeeded()
}
override fun close() {
coroutineScope.cancel()
inner.close()
}
private suspend fun fetchMembers() = withContext(dispatcher) {
private fun CoroutineScope.fetchMembers() = launch(dispatcher) {
initLatch.await()
try {
inner.fetchMembers()
@ -263,32 +265,6 @@ class RustTimeline(
}
}
private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
ensureActive()
timelineDiffProcessor.postItems(it)
}
isInit.set(true)
initLatch.complete(Unit)
}
private suspend fun postDiffs(diffs: List<TimelineDiff>) {
val diffsToProcess = diffs.toMutableList()
if (!isInit.get()) {
val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET }
if (resetDiff != null) {
// Keep using the postItems logic so we can post the timelineItems asap.
postItems(resetDiff.reset() ?: emptyList())
diffsToProcess.remove(resetDiff)
}
}
initLatch.await()
if (diffsToProcess.isNotEmpty()) {
timelineDiffProcessor.postDiffs(diffsToProcess)
}
}
override suspend fun sendMessage(body: String, htmlBody: String?, mentions: List<Mention>): Result<Unit> = withContext(dispatcher) {
messageEventContentFromParts(body, htmlBody).withMentions(mentions.map()).use { content ->
runCatching<Unit> {
@ -554,12 +530,6 @@ class RustTimeline(
}
}
private suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
return runCatching {
inner.fetchDetailsForEvent(eventId.value)
}
}
override suspend fun loadReplyDetails(eventId: EventId): InReplyTo = withContext(dispatcher) {
val timelineItem = _timelineItems.value.firstOrNull { timelineItem ->
timelineItem is MatrixTimelineItem.Event && timelineItem.eventId == eventId
@ -576,4 +546,21 @@ class RustTimeline(
inner.loadReplyDetails(eventId.value).use(inReplyToMapper::map)
}
}
private suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
return runCatching {
inner.fetchDetailsForEvent(eventId.value)
}
}
}
private suspend fun List<MatrixTimelineItem>.process(
predicate: Boolean = true,
processor: suspend (List<MatrixTimelineItem>) -> List<MatrixTimelineItem>
): List<MatrixTimelineItem> {
return if (predicate) {
processor(this)
} else {
this
}
}

View file

@ -0,0 +1,116 @@
/*
* Copyright (c) 2024 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.timeline
import io.element.android.libraries.core.coroutine.childScope
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.matrix.rustcomponents.sdk.Timeline
import org.matrix.rustcomponents.sdk.TimelineChange
import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineItem
import uniffi.matrix_sdk_ui.EventItemOrigin
private const val INITIAL_MAX_SIZE = 50
/**
* This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor.
* It will also trigger a callback when a new synced event is received.
* It will also handle the initial items and make sure they are posted before any diff.
*/
internal class TimelineItemsSubscriber(
timelineCoroutineScope: CoroutineScope,
dispatcher: CoroutineDispatcher,
private val timeline: Timeline,
private val timelineDiffProcessor: MatrixTimelineDiffProcessor,
private val initLatch: CompletableDeferred<Unit>,
private val isInit: MutableStateFlow<Boolean>,
private val onNewSyncedEvent: () -> Unit,
) {
private var subscriptionCount = 0
private val mutex = Mutex()
private val coroutineScope = timelineCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber")
/**
* Add a subscription to the timeline and start posting items/diffs to the timelineDiffProcessor.
* It will also trigger a callback when a new synced event is received.
*/
suspend fun subscribeIfNeeded() = mutex.withLock {
if (subscriptionCount == 0) {
timeline.timelineDiffFlow()
.onEach { diffs ->
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
onNewSyncedEvent()
}
postDiffs(diffs)
}
.launchIn(coroutineScope)
}
subscriptionCount++
}
/**
* Remove a subscription to the timeline and unsubscribe if needed.
* The timeline will be unsubscribed when the last subscription is removed.
* If the timelineCoroutineScope is cancelled, the timeline will be unsubscribed automatically.
*/
suspend fun unsubscribeIfNeeded() = mutex.withLock {
when (subscriptionCount) {
0 -> return@withLock
1 -> {
coroutineScope.coroutineContext.cancelChildren()
}
}
subscriptionCount--
}
private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
ensureActive()
timelineDiffProcessor.postItems(it)
}
isInit.value = true
initLatch.complete(Unit)
}
private suspend fun postDiffs(diffs: List<TimelineDiff>) {
val diffsToProcess = diffs.toMutableList()
if (!isInit.value) {
val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET }
if (resetDiff != null) {
// Keep using the postItems logic so we can post the timelineItems asap.
postItems(resetDiff.reset() ?: emptyList())
diffsToProcess.remove(resetDiff)
}
}
initLatch.await()
if (diffsToProcess.isNotEmpty()) {
timelineDiffProcessor.postDiffs(diffsToProcess)
}
}
}