misc : simplify timeline diff logic
This commit is contained in:
parent
e752323f37
commit
385b8c37cb
9 changed files with 10 additions and 113 deletions
|
|
@ -28,14 +28,6 @@ internal class MatrixTimelineDiffProcessor(
|
|||
private val _membershipChangeEventReceived = MutableSharedFlow<Unit>(extraBufferCapacity = 1)
|
||||
val membershipChangeEventReceived: Flow<Unit> = _membershipChangeEventReceived
|
||||
|
||||
suspend fun postItems(items: List<TimelineItem>) {
|
||||
updateTimelineItems {
|
||||
Timber.v("Update timeline items from postItems (with ${items.size} items) on ${Thread.currentThread()}")
|
||||
val mappedItems = items.map { it.asMatrixTimelineItem() }
|
||||
addAll(0, mappedItems)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun postDiffs(diffs: List<TimelineDiff>) {
|
||||
updateTimelineItems {
|
||||
Timber.v("Update timeline items from postDiffs (with ${diffs.size} items) on ${Thread.currentThread()}")
|
||||
|
|
|
|||
|
|
@ -48,7 +48,6 @@ import io.element.android.libraries.matrix.impl.timeline.postprocessor.TypingNot
|
|||
import io.element.android.libraries.matrix.impl.timeline.reply.InReplyToMapper
|
||||
import io.element.android.libraries.matrix.impl.util.MessageEventContent
|
||||
import io.element.android.services.toolbox.api.systemclock.SystemClock
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
|
|
@ -95,9 +94,6 @@ class RustTimeline(
|
|||
private val featureFlagsService: FeatureFlagService,
|
||||
onNewSyncedEvent: () -> Unit,
|
||||
) : Timeline {
|
||||
private val initLatch = CompletableDeferred<Unit>()
|
||||
private val isTimelineInitialized = MutableStateFlow(false)
|
||||
|
||||
private val _timelineItems: MutableSharedFlow<List<MatrixTimelineItem>> =
|
||||
MutableSharedFlow(replay = 1, extraBufferCapacity = Int.MAX_VALUE)
|
||||
|
||||
|
|
@ -119,8 +115,6 @@ class RustTimeline(
|
|||
timeline = inner,
|
||||
timelineCoroutineScope = coroutineScope,
|
||||
timelineDiffProcessor = timelineDiffProcessor,
|
||||
initLatch = initLatch,
|
||||
isTimelineInitialized = isTimelineInitialized,
|
||||
dispatcher = dispatcher,
|
||||
onNewSyncedEvent = onNewSyncedEvent,
|
||||
)
|
||||
|
|
@ -181,7 +175,6 @@ class RustTimeline(
|
|||
// Use NonCancellable to avoid breaking the timeline when the coroutine is cancelled.
|
||||
override suspend fun paginate(direction: Timeline.PaginationDirection): Result<Boolean> = withContext(NonCancellable) {
|
||||
withContext(dispatcher) {
|
||||
initLatch.await()
|
||||
runCatchingExceptions {
|
||||
if (!canPaginate(direction)) throw TimelineException.CannotPaginate
|
||||
updatePaginationStatus(direction) { it.copy(isPaginating = true) }
|
||||
|
|
@ -203,7 +196,6 @@ class RustTimeline(
|
|||
}
|
||||
|
||||
private fun canPaginate(direction: Timeline.PaginationDirection): Boolean {
|
||||
if (!isTimelineInitialized.value) return false
|
||||
return when (direction) {
|
||||
Timeline.PaginationDirection.BACKWARDS -> backwardPaginationStatus.value.canPaginate
|
||||
Timeline.PaginationDirection.FORWARDS -> forwardPaginationStatus.value.canPaginate
|
||||
|
|
@ -215,12 +207,10 @@ class RustTimeline(
|
|||
backwardPaginationStatus,
|
||||
forwardPaginationStatus,
|
||||
joinedRoom.roomInfoFlow.map { it.creator to it.isDm }.distinctUntilChanged(),
|
||||
isTimelineInitialized,
|
||||
) { timelineItems,
|
||||
backwardPaginationStatus,
|
||||
forwardPaginationStatus,
|
||||
(roomCreator, isDm),
|
||||
isTimelineInitialized ->
|
||||
(roomCreator, isDm) ->
|
||||
withContext(dispatcher) {
|
||||
timelineItems
|
||||
.let { items ->
|
||||
|
|
@ -234,7 +224,6 @@ class RustTimeline(
|
|||
.let { items ->
|
||||
loadingIndicatorsPostProcessor.process(
|
||||
items = items,
|
||||
isTimelineInitialized = isTimelineInitialized,
|
||||
hasMoreToLoadBackward = backwardPaginationStatus.hasMoreToLoad,
|
||||
hasMoreToLoadForward = forwardPaginationStatus.hasMoreToLoad,
|
||||
)
|
||||
|
|
@ -244,10 +233,7 @@ class RustTimeline(
|
|||
}
|
||||
// Keep lastForwardIndicatorsPostProcessor last
|
||||
.let { items ->
|
||||
lastForwardIndicatorsPostProcessor.process(
|
||||
items = items,
|
||||
isTimelineInitialized = isTimelineInitialized,
|
||||
)
|
||||
lastForwardIndicatorsPostProcessor.process(items = items)
|
||||
}
|
||||
}
|
||||
}.onStart {
|
||||
|
|
@ -262,7 +248,6 @@ class RustTimeline(
|
|||
}
|
||||
|
||||
private fun CoroutineScope.fetchMembers() = launch(dispatcher) {
|
||||
initLatch.await()
|
||||
try {
|
||||
inner.fetchMembers()
|
||||
} catch (exception: Exception) {
|
||||
|
|
|
|||
|
|
@ -8,37 +8,26 @@
|
|||
package io.element.android.libraries.matrix.impl.timeline
|
||||
|
||||
import io.element.android.libraries.core.coroutine.childScope
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.cancelChildren
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import org.matrix.rustcomponents.sdk.Timeline
|
||||
import org.matrix.rustcomponents.sdk.TimelineChange
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import uniffi.matrix_sdk_ui.EventItemOrigin
|
||||
|
||||
private const val INITIAL_MAX_SIZE = 50
|
||||
|
||||
/**
|
||||
* This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor.
|
||||
* It will also trigger a callback when a new synced event is received.
|
||||
* It will also handle the initial items and make sure they are posted before any diff.
|
||||
*/
|
||||
internal class TimelineItemsSubscriber(
|
||||
timelineCoroutineScope: CoroutineScope,
|
||||
dispatcher: CoroutineDispatcher,
|
||||
private val timeline: Timeline,
|
||||
private val timelineDiffProcessor: MatrixTimelineDiffProcessor,
|
||||
private val initLatch: CompletableDeferred<Unit>,
|
||||
private val isTimelineInitialized: MutableStateFlow<Boolean>,
|
||||
private val onNewSyncedEvent: () -> Unit,
|
||||
) {
|
||||
private var subscriptionCount = 0
|
||||
|
|
@ -57,7 +46,7 @@ internal class TimelineItemsSubscriber(
|
|||
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
|
||||
onNewSyncedEvent()
|
||||
}
|
||||
postDiffs(diffs)
|
||||
timelineDiffProcessor.postDiffs(diffs)
|
||||
}
|
||||
.launchIn(coroutineScope)
|
||||
}
|
||||
|
|
@ -78,35 +67,4 @@ internal class TimelineItemsSubscriber(
|
|||
}
|
||||
subscriptionCount--
|
||||
}
|
||||
|
||||
private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
|
||||
if (items.isEmpty()) {
|
||||
// Makes sure to post empty list if there is no item, so you can handle empty state.
|
||||
timelineDiffProcessor.postItems(emptyList())
|
||||
} else {
|
||||
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
|
||||
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
|
||||
ensureActive()
|
||||
timelineDiffProcessor.postItems(it)
|
||||
}
|
||||
}
|
||||
isTimelineInitialized.value = true
|
||||
initLatch.complete(Unit)
|
||||
}
|
||||
|
||||
private suspend fun postDiffs(diffs: List<TimelineDiff>) {
|
||||
val diffsToProcess = diffs.toMutableList()
|
||||
if (!isTimelineInitialized.value) {
|
||||
val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET }
|
||||
if (resetDiff != null) {
|
||||
// Keep using the postItems logic so we can post the timelineItems asap.
|
||||
postItems(resetDiff.reset() ?: emptyList())
|
||||
diffsToProcess.remove(resetDiff)
|
||||
}
|
||||
}
|
||||
initLatch.await()
|
||||
if (diffsToProcess.isNotEmpty()) {
|
||||
timelineDiffProcessor.postDiffs(diffsToProcess)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,9 +22,7 @@ class LastForwardIndicatorsPostProcessor(
|
|||
|
||||
fun process(
|
||||
items: List<MatrixTimelineItem>,
|
||||
isTimelineInitialized: Boolean,
|
||||
): List<MatrixTimelineItem> {
|
||||
if (!isTimelineInitialized) return items
|
||||
// We don't need to add the last forward indicator if we are not in the FOCUSED_ON_EVENT mode
|
||||
if (mode != Timeline.Mode.FOCUSED_ON_EVENT) {
|
||||
return items
|
||||
|
|
|
|||
|
|
@ -16,11 +16,9 @@ import io.element.android.services.toolbox.api.systemclock.SystemClock
|
|||
class LoadingIndicatorsPostProcessor(private val systemClock: SystemClock) {
|
||||
fun process(
|
||||
items: List<MatrixTimelineItem>,
|
||||
isTimelineInitialized: Boolean,
|
||||
hasMoreToLoadBackward: Boolean,
|
||||
hasMoreToLoadForward: Boolean,
|
||||
): List<MatrixTimelineItem> {
|
||||
if (!isTimelineInitialized) return items
|
||||
val shouldAddForwardLoadingIndicator = hasMoreToLoadForward && items.isNotEmpty()
|
||||
val currentTimestamp = systemClock.epochMillis()
|
||||
return buildList {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue