Timeline : start reworking timeline apis
This commit is contained in:
parent
2ae6931486
commit
83e7c13618
33 changed files with 593 additions and 471 deletions
|
|
@ -42,7 +42,7 @@ import io.element.android.libraries.matrix.api.room.location.AssetType
|
|||
import io.element.android.libraries.matrix.api.room.powerlevels.MatrixRoomPowerLevels
|
||||
import io.element.android.libraries.matrix.api.room.powerlevels.UserRoleChange
|
||||
import io.element.android.libraries.matrix.api.room.roomNotificationSettings
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
|
||||
import io.element.android.libraries.matrix.api.timeline.LiveTimeline
|
||||
import io.element.android.libraries.matrix.api.timeline.ReceiptType
|
||||
import io.element.android.libraries.matrix.api.widget.MatrixWidgetDriver
|
||||
import io.element.android.libraries.matrix.api.widget.MatrixWidgetSettings
|
||||
|
|
@ -56,7 +56,7 @@ import io.element.android.libraries.matrix.impl.room.location.toInner
|
|||
import io.element.android.libraries.matrix.impl.room.member.RoomMemberListFetcher
|
||||
import io.element.android.libraries.matrix.impl.room.member.RoomMemberMapper
|
||||
import io.element.android.libraries.matrix.impl.room.powerlevels.RoomPowerLevelsMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
|
||||
import io.element.android.libraries.matrix.impl.timeline.RustLiveTimeline
|
||||
import io.element.android.libraries.matrix.impl.timeline.toRustReceiptType
|
||||
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
|
||||
import io.element.android.libraries.matrix.impl.widget.RustWidgetDriver
|
||||
|
|
@ -159,7 +159,7 @@ class RustMatrixRoom(
|
|||
private val _roomNotificationSettingsStateFlow = MutableStateFlow<MatrixRoomNotificationSettingsState>(MatrixRoomNotificationSettingsState.Unknown)
|
||||
override val roomNotificationSettingsStateFlow: StateFlow<MatrixRoomNotificationSettingsState> = _roomNotificationSettingsStateFlow
|
||||
|
||||
override val timeline = createMatrixTimeline(innerTimeline) {
|
||||
override val liveTimeline = createLiveTimeline(innerTimeline){
|
||||
_syncUpdateFlow.value = systemClock.epochMillis()
|
||||
}
|
||||
|
||||
|
|
@ -169,7 +169,7 @@ class RustMatrixRoom(
|
|||
|
||||
init {
|
||||
val powerLevelChanges = roomInfoFlow.map { it.userPowerLevels }.distinctUntilChanged()
|
||||
val membershipChanges = timeline.membershipChangeEventReceived.onStart { emit(Unit) }
|
||||
val membershipChanges = liveTimeline.membershipChangeEventReceived.onStart { emit(Unit) }
|
||||
combine(membershipChanges, powerLevelChanges) { _, _ -> }
|
||||
// Skip initial one
|
||||
.drop(1)
|
||||
|
|
@ -184,7 +184,7 @@ class RustMatrixRoom(
|
|||
|
||||
override fun destroy() {
|
||||
roomCoroutineScope.cancel()
|
||||
timeline.close()
|
||||
liveTimeline.close()
|
||||
innerRoom.destroy()
|
||||
roomListItem.destroy()
|
||||
specialModeEventTimelineItem?.destroy()
|
||||
|
|
@ -744,18 +744,24 @@ class RustMatrixRoom(
|
|||
}
|
||||
}
|
||||
|
||||
private fun createMatrixTimeline(
|
||||
private fun createLiveTimeline(
|
||||
timeline: InnerTimeline,
|
||||
onNewSyncedEvent: () -> Unit = {},
|
||||
): MatrixTimeline {
|
||||
return RustMatrixTimeline(
|
||||
): LiveTimeline {
|
||||
return RustLiveTimeline(
|
||||
isKeyBackupEnabled = isKeyBackupEnabled,
|
||||
matrixRoom = this,
|
||||
systemClock = systemClock,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
dispatcher = roomDispatcher,
|
||||
lastLoginTimestamp = sessionData.loginTimestamp,
|
||||
onNewSyncedEvent = onNewSyncedEvent,
|
||||
innerTimeline = timeline,
|
||||
inner = timeline,
|
||||
fetchDetailsForEvent = { eventId ->
|
||||
runCatching {
|
||||
innerTimeline.getEventTimelineItemByEventId(eventId.value)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,106 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.timeline
|
||||
|
||||
import io.element.android.libraries.matrix.api.core.EventId
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
|
||||
import io.element.android.libraries.matrix.api.timeline.ReceiptType
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.CoroutineStart
|
||||
import kotlinx.coroutines.NonCancellable
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import timber.log.Timber
|
||||
|
||||
/**
|
||||
* This class is a wrapper around a [MatrixTimeline] that will be created asynchronously.
|
||||
*/
|
||||
@Suppress("unused")
|
||||
class AsyncMatrixTimeline(
|
||||
coroutineScope: CoroutineScope,
|
||||
dispatcher: CoroutineDispatcher,
|
||||
private val timelineProvider: suspend () -> MatrixTimeline
|
||||
) : MatrixTimeline {
|
||||
private val _timelineItems: MutableStateFlow<List<MatrixTimelineItem>> =
|
||||
MutableStateFlow(emptyList())
|
||||
|
||||
private val _paginationState = MutableStateFlow(
|
||||
MatrixTimeline.PaginationState.Initial
|
||||
)
|
||||
private val timeline = coroutineScope.async(context = dispatcher, start = CoroutineStart.LAZY) {
|
||||
timelineProvider()
|
||||
}
|
||||
private val closeSignal = CompletableDeferred<Unit>()
|
||||
|
||||
override val membershipChangeEventReceived = MutableSharedFlow<Unit>(extraBufferCapacity = 1)
|
||||
|
||||
init {
|
||||
coroutineScope.launch {
|
||||
val delegateTimeline = timeline.await()
|
||||
delegateTimeline.timelineItems
|
||||
.onEach { _timelineItems.value = it }
|
||||
.launchIn(this)
|
||||
delegateTimeline.paginationState
|
||||
.onEach { _paginationState.value = it }
|
||||
.launchIn(this)
|
||||
delegateTimeline.membershipChangeEventReceived
|
||||
.onEach { membershipChangeEventReceived.emit(it) }
|
||||
.launchIn(this)
|
||||
|
||||
launch {
|
||||
withContext(NonCancellable) {
|
||||
closeSignal.await()
|
||||
Timber.d("Close delegate")
|
||||
delegateTimeline.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override val paginationState: StateFlow<MatrixTimeline.PaginationState> = _paginationState
|
||||
override val timelineItems: Flow<List<MatrixTimelineItem>> = _timelineItems
|
||||
|
||||
override suspend fun paginateBackwards(requestSize: Int): Result<Unit> {
|
||||
return timeline.await().paginateBackwards(requestSize)
|
||||
}
|
||||
|
||||
override suspend fun paginateBackwards(requestSize: Int, untilNumberOfItems: Int): Result<Unit> {
|
||||
return timeline.await().paginateBackwards(requestSize, untilNumberOfItems)
|
||||
}
|
||||
|
||||
override suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
|
||||
return timeline.await().fetchDetailsForEvent(eventId)
|
||||
}
|
||||
|
||||
override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result<Unit> {
|
||||
return timeline.await().sendReadReceipt(eventId, receiptType)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
closeSignal.complete(Unit)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Copyright (c) 2024 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.timeline
|
||||
|
||||
class RustDetachedTimeline {
|
||||
}
|
||||
|
|
@ -0,0 +1,198 @@
|
|||
/*
|
||||
* Copyright (c) 2024 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.timeline
|
||||
|
||||
import io.element.android.libraries.matrix.api.core.EventId
|
||||
import io.element.android.libraries.matrix.api.room.MatrixRoom
|
||||
import io.element.android.libraries.matrix.api.timeline.LiveTimeline
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
|
||||
import io.element.android.libraries.matrix.api.timeline.ReceiptType
|
||||
import io.element.android.libraries.matrix.api.timeline.Timeline
|
||||
import io.element.android.libraries.matrix.api.timeline.TimelineException
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessageMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.postprocessor.LoadingIndicatorsPostProcessor
|
||||
import io.element.android.libraries.matrix.impl.timeline.postprocessor.RoomBeginningPostProcessor
|
||||
import io.element.android.libraries.matrix.impl.timeline.postprocessor.TimelineEncryptedHistoryPostProcessor
|
||||
import io.element.android.services.toolbox.api.systemclock.SystemClock
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import timber.log.Timber
|
||||
import uniffi.matrix_sdk_ui.EventItemOrigin
|
||||
import java.util.Date
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline
|
||||
|
||||
private const val INITIAL_MAX_SIZE = 50
|
||||
|
||||
class RustLiveTimeline(
|
||||
private val inner: InnerTimeline,
|
||||
private val systemClock: SystemClock,
|
||||
private val roomCoroutineScope: CoroutineScope,
|
||||
private val isKeyBackupEnabled: Boolean,
|
||||
private val matrixRoom: MatrixRoom,
|
||||
private val dispatcher: CoroutineDispatcher,
|
||||
private val lastLoginTimestamp: Date?,
|
||||
private val fetchDetailsForEvent: suspend (EventId) -> Result<Unit>,
|
||||
private val onNewSyncedEvent: () -> Unit,
|
||||
) : LiveTimeline {
|
||||
|
||||
private val initLatch = CompletableDeferred<Unit>()
|
||||
private val isInit = AtomicBoolean(false)
|
||||
|
||||
private val _timelineItems: MutableStateFlow<List<MatrixTimelineItem>> =
|
||||
MutableStateFlow(emptyList())
|
||||
|
||||
private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor(
|
||||
lastLoginTimestamp = lastLoginTimestamp,
|
||||
isRoomEncrypted = matrixRoom.isEncrypted,
|
||||
isKeyBackupEnabled = isKeyBackupEnabled,
|
||||
dispatcher = dispatcher,
|
||||
)
|
||||
|
||||
private val roomBeginningPostProcessor = RoomBeginningPostProcessor()
|
||||
private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock)
|
||||
|
||||
private val timelineItemFactory = MatrixTimelineItemMapper(
|
||||
fetchDetailsForEvent = fetchDetailsForEvent,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
|
||||
eventTimelineItemMapper = EventTimelineItemMapper(
|
||||
contentMapper = TimelineEventContentMapper(
|
||||
eventMessageMapper = EventMessageMapper()
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
private val timelineDiffProcessor = MatrixTimelineDiffProcessor(
|
||||
timelineItems = _timelineItems,
|
||||
timelineItemFactory = timelineItemFactory,
|
||||
)
|
||||
|
||||
init {
|
||||
roomCoroutineScope.launch(dispatcher) {
|
||||
inner.timelineDiffFlow { initialList ->
|
||||
postItems(initialList)
|
||||
}.onEach { diffs ->
|
||||
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
|
||||
onNewSyncedEvent()
|
||||
}
|
||||
postDiffs(diffs)
|
||||
}.launchIn(this)
|
||||
|
||||
launch {
|
||||
fetchMembers()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override val membershipChangeEventReceived: Flow<Unit> = timelineDiffProcessor.membershipChangeEventReceived
|
||||
|
||||
override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result<Unit> {
|
||||
return runCatching {
|
||||
inner.sendReadReceipt(receiptType.toRustReceiptType(), eventId.value)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun paginateBackwards(): Result<Boolean> {
|
||||
initLatch.await()
|
||||
return runCatching {
|
||||
if (!canBackPaginate()) throw TimelineException.CannotPaginate
|
||||
inner.paginateBackwards()
|
||||
}.onFailure { error ->
|
||||
if (error is TimelineException.CannotPaginate) {
|
||||
Timber.d("Can't paginate backwards on room ${matrixRoom.roomId} with backPaginationStatus: ${backPaginationStatus.value}")
|
||||
} else {
|
||||
Timber.e(error, "Error paginating backwards on room ${matrixRoom.roomId}")
|
||||
}
|
||||
}.onSuccess {
|
||||
Timber.v("Success back paginating for room ${matrixRoom.roomId}")
|
||||
}
|
||||
}
|
||||
|
||||
private fun canBackPaginate(): Boolean {
|
||||
return isInit.get() && backPaginationStatus.value.canPaginate
|
||||
}
|
||||
|
||||
override val backPaginationStatus: StateFlow<Timeline.PaginationStatus> = inner
|
||||
.backPaginationStatusFlow()
|
||||
.map()
|
||||
.stateIn(roomCoroutineScope, SharingStarted.Eagerly, Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true))
|
||||
|
||||
override val timelineItems: Flow<List<MatrixTimelineItem>> = combine(
|
||||
_timelineItems,
|
||||
backPaginationStatus.map { it.hasMoreToLoad }.distinctUntilChanged()
|
||||
) { timelineItems, hasMoreToLoadBackward ->
|
||||
timelineItems
|
||||
.let { items -> encryptedHistoryPostProcessor.process(items) }
|
||||
.let { items ->
|
||||
roomBeginningPostProcessor.process(
|
||||
items = items,
|
||||
isDm = matrixRoom.isDm,
|
||||
hasMoreToLoadBackwards = hasMoreToLoadBackward
|
||||
)
|
||||
}.let {items -> loadingIndicatorsPostProcessor.process(items, hasMoreToLoadBackward)}
|
||||
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
inner.close()
|
||||
}
|
||||
|
||||
private suspend fun fetchMembers() = withContext(dispatcher) {
|
||||
initLatch.await()
|
||||
try {
|
||||
inner.fetchMembers()
|
||||
} catch (exception: Exception) {
|
||||
Timber.e(exception, "Error fetching members for room ${matrixRoom.roomId}")
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
|
||||
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
|
||||
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
|
||||
ensureActive()
|
||||
timelineDiffProcessor.postItems(it)
|
||||
}
|
||||
isInit.set(true)
|
||||
initLatch.complete(Unit)
|
||||
}
|
||||
|
||||
private suspend fun postDiffs(diffs: List<TimelineDiff>) {
|
||||
initLatch.await()
|
||||
timelineDiffProcessor.postDiffs(diffs)
|
||||
}
|
||||
}
|
||||
|
|
@ -18,16 +18,14 @@ package io.element.android.libraries.matrix.impl.timeline
|
|||
|
||||
import io.element.android.libraries.matrix.api.core.EventId
|
||||
import io.element.android.libraries.matrix.api.room.MatrixRoom
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
|
||||
import io.element.android.libraries.matrix.api.timeline.ReceiptType
|
||||
import io.element.android.libraries.matrix.api.timeline.TimelineException
|
||||
import io.element.android.libraries.matrix.api.timeline.item.virtual.VirtualTimelineItem
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessageMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.postprocessor.DmBeginningTimelineProcessor
|
||||
import io.element.android.libraries.matrix.impl.timeline.postprocessor.RoomBeginningPostProcessor
|
||||
import io.element.android.libraries.matrix.impl.timeline.postprocessor.TimelineEncryptedHistoryPostProcessor
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
|
|
@ -39,7 +37,6 @@ import kotlinx.coroutines.flow.Flow
|
|||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.mapLatest
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
|
|
@ -50,224 +47,9 @@ import org.matrix.rustcomponents.sdk.Timeline
|
|||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import timber.log.Timber
|
||||
import uniffi.matrix_sdk_ui.BackPaginationStatus
|
||||
import uniffi.matrix_sdk_ui.EventItemOrigin
|
||||
import java.util.Date
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
private const val INITIAL_MAX_SIZE = 50
|
||||
|
||||
class RustMatrixTimeline(
|
||||
roomCoroutineScope: CoroutineScope,
|
||||
isKeyBackupEnabled: Boolean,
|
||||
private val matrixRoom: MatrixRoom,
|
||||
private val innerTimeline: Timeline,
|
||||
private val dispatcher: CoroutineDispatcher,
|
||||
lastLoginTimestamp: Date?,
|
||||
private val onNewSyncedEvent: () -> Unit,
|
||||
) : MatrixTimeline {
|
||||
private val initLatch = CompletableDeferred<Unit>()
|
||||
private val isInit = AtomicBoolean(false)
|
||||
|
||||
private val _timelineItems: MutableStateFlow<List<MatrixTimelineItem>> =
|
||||
MutableStateFlow(emptyList())
|
||||
|
||||
private val _paginationState = MutableStateFlow(
|
||||
MatrixTimeline.PaginationState.Initial
|
||||
)
|
||||
|
||||
private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor(
|
||||
lastLoginTimestamp = lastLoginTimestamp,
|
||||
isRoomEncrypted = matrixRoom.isEncrypted,
|
||||
isKeyBackupEnabled = isKeyBackupEnabled,
|
||||
dispatcher = dispatcher,
|
||||
)
|
||||
|
||||
private val dmBeginningTimelineProcessor = DmBeginningTimelineProcessor()
|
||||
|
||||
private val timelineItemFactory = MatrixTimelineItemMapper(
|
||||
fetchDetailsForEvent = this::fetchDetailsForEvent,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
|
||||
eventTimelineItemMapper = EventTimelineItemMapper(
|
||||
contentMapper = TimelineEventContentMapper(
|
||||
eventMessageMapper = EventMessageMapper()
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
private val timelineDiffProcessor = MatrixTimelineDiffProcessor(
|
||||
timelineItems = _timelineItems,
|
||||
timelineItemFactory = timelineItemFactory,
|
||||
)
|
||||
|
||||
override val paginationState: StateFlow<MatrixTimeline.PaginationState> = _paginationState.asStateFlow()
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
override val timelineItems: Flow<List<MatrixTimelineItem>> = _timelineItems
|
||||
.mapLatest { items -> encryptedHistoryPostProcessor.process(items) }
|
||||
.mapLatest { items ->
|
||||
dmBeginningTimelineProcessor.process(
|
||||
items = items,
|
||||
isDm = matrixRoom.isDirect && matrixRoom.isOneToOne,
|
||||
isAtStartOfTimeline = paginationState.value.beginningOfRoomReached
|
||||
)
|
||||
}
|
||||
|
||||
override val membershipChangeEventReceived: Flow<Unit> = timelineDiffProcessor.membershipChangeEventReceived
|
||||
|
||||
init {
|
||||
Timber.d("Initialize timeline for room ${matrixRoom.roomId}")
|
||||
|
||||
roomCoroutineScope.launch(dispatcher) {
|
||||
innerTimeline.timelineDiffFlow { initialList ->
|
||||
postItems(initialList)
|
||||
}.onEach { diffs ->
|
||||
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
|
||||
onNewSyncedEvent()
|
||||
}
|
||||
postDiffs(diffs)
|
||||
}.launchIn(this)
|
||||
|
||||
paginationStateFlow()
|
||||
.onEach {
|
||||
_paginationState.value = it
|
||||
}
|
||||
.launchIn(this)
|
||||
|
||||
fetchMembers()
|
||||
}
|
||||
}
|
||||
|
||||
private fun paginationStateFlow(): Flow<MatrixTimeline.PaginationState> {
|
||||
return combine(
|
||||
innerTimeline.backPaginationStatusFlow(),
|
||||
timelineItems,
|
||||
) { paginationStatus, filteredItems ->
|
||||
if (filteredItems.hasEncryptionHistoryBanner()) {
|
||||
return@combine MatrixTimeline.PaginationState(
|
||||
isBackPaginating = false,
|
||||
hasMoreToLoadBackwards = false,
|
||||
beginningOfRoomReached = false,
|
||||
)
|
||||
}
|
||||
when (paginationStatus) {
|
||||
BackPaginationStatus.IDLE -> {
|
||||
MatrixTimeline.PaginationState(
|
||||
isBackPaginating = false,
|
||||
hasMoreToLoadBackwards = true,
|
||||
beginningOfRoomReached = false,
|
||||
)
|
||||
}
|
||||
BackPaginationStatus.PAGINATING -> {
|
||||
MatrixTimeline.PaginationState(
|
||||
isBackPaginating = true,
|
||||
hasMoreToLoadBackwards = true,
|
||||
beginningOfRoomReached = false,
|
||||
)
|
||||
}
|
||||
BackPaginationStatus.TIMELINE_START_REACHED -> {
|
||||
MatrixTimeline.PaginationState(
|
||||
isBackPaginating = false,
|
||||
hasMoreToLoadBackwards = false,
|
||||
beginningOfRoomReached = true,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun fetchMembers() = withContext(dispatcher) {
|
||||
initLatch.await()
|
||||
try {
|
||||
innerTimeline.fetchMembers()
|
||||
} catch (exception: Exception) {
|
||||
Timber.e(exception, "Error fetching members for room ${matrixRoom.roomId}")
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
|
||||
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
|
||||
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
|
||||
ensureActive()
|
||||
timelineDiffProcessor.postItems(it)
|
||||
}
|
||||
isInit.set(true)
|
||||
initLatch.complete(Unit)
|
||||
}
|
||||
|
||||
private suspend fun postDiffs(diffs: List<TimelineDiff>) {
|
||||
initLatch.await()
|
||||
timelineDiffProcessor.postDiffs(diffs)
|
||||
}
|
||||
|
||||
override suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> = withContext(dispatcher) {
|
||||
runCatching {
|
||||
innerTimeline.fetchDetailsForEvent(eventId.value)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun paginateBackwards(requestSize: Int): Result<Unit> {
|
||||
val paginationOptions = PaginationOptions.SimpleRequest(
|
||||
eventLimit = requestSize.toUShort(),
|
||||
waitForToken = true,
|
||||
)
|
||||
return paginateBackwards(paginationOptions)
|
||||
}
|
||||
|
||||
override suspend fun paginateBackwards(requestSize: Int, untilNumberOfItems: Int): Result<Unit> {
|
||||
val paginationOptions = PaginationOptions.UntilNumItems(
|
||||
eventLimit = requestSize.toUShort(),
|
||||
items = untilNumberOfItems.toUShort(),
|
||||
waitForToken = true,
|
||||
)
|
||||
return paginateBackwards(paginationOptions)
|
||||
}
|
||||
|
||||
private suspend fun paginateBackwards(paginationOptions: PaginationOptions): Result<Unit> = withContext(dispatcher) {
|
||||
initLatch.await()
|
||||
runCatching {
|
||||
if (!canBackPaginate()) throw TimelineException.CannotPaginate
|
||||
Timber.v("Start back paginating for room ${matrixRoom.roomId} ")
|
||||
innerTimeline.paginateBackwards(paginationOptions)
|
||||
}.onFailure { error ->
|
||||
if (error is TimelineException.CannotPaginate) {
|
||||
Timber.d("Can't paginate backwards on room ${matrixRoom.roomId}, we're already at the start")
|
||||
} else {
|
||||
Timber.e(error, "Error paginating backwards on room ${matrixRoom.roomId}")
|
||||
}
|
||||
}.onSuccess {
|
||||
Timber.v("Success back paginating for room ${matrixRoom.roomId}")
|
||||
}
|
||||
}
|
||||
|
||||
private fun canBackPaginate(): Boolean {
|
||||
return isInit.get() && paginationState.value.canBackPaginate
|
||||
}
|
||||
|
||||
override suspend fun sendReadReceipt(
|
||||
eventId: EventId,
|
||||
receiptType: ReceiptType,
|
||||
) = withContext(dispatcher) {
|
||||
runCatching {
|
||||
innerTimeline.sendReadReceipt(
|
||||
receiptType = receiptType.toRustReceiptType(),
|
||||
eventId = eventId.value,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
innerTimeline.close()
|
||||
}
|
||||
|
||||
fun getItemById(eventId: EventId): MatrixTimelineItem.Event? {
|
||||
return _timelineItems.value.firstOrNull { (it as? MatrixTimelineItem.Event)?.eventId == eventId } as? MatrixTimelineItem.Event
|
||||
}
|
||||
|
||||
private fun List<MatrixTimelineItem>.hasEncryptionHistoryBanner(): Boolean {
|
||||
val firstItem = firstOrNull()
|
||||
return firstItem is MatrixTimelineItem.Virtual &&
|
||||
firstItem.virtual is VirtualTimelineItem.EncryptedHistoryBanner
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright (c) 2024 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.timeline
|
||||
|
||||
import io.element.android.libraries.matrix.api.timeline.Timeline
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import uniffi.matrix_sdk_ui.PaginationStatus
|
||||
|
||||
fun Flow<PaginationStatus>.map(): Flow<Timeline.PaginationStatus> = map { paginationStatus ->
|
||||
when (paginationStatus) {
|
||||
PaginationStatus.IDLE -> Timeline.PaginationStatus(
|
||||
isPaginating = false,
|
||||
hasMoreToLoad = true
|
||||
)
|
||||
PaginationStatus.PAGINATING -> Timeline.PaginationStatus(
|
||||
isPaginating = true,
|
||||
hasMoreToLoad = true
|
||||
)
|
||||
PaginationStatus.TIMELINE_END_REACHED -> Timeline.PaginationStatus(
|
||||
isPaginating = false,
|
||||
hasMoreToLoad = false
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright (c) 2024 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.timeline.postprocessor
|
||||
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
|
||||
import io.element.android.libraries.matrix.api.timeline.item.virtual.VirtualTimelineItem
|
||||
|
||||
fun List<MatrixTimelineItem>.hasEncryptionHistoryBanner(): Boolean {
|
||||
val firstItem = firstOrNull()
|
||||
return firstItem is MatrixTimelineItem.Virtual &&
|
||||
firstItem.virtual is VirtualTimelineItem.EncryptedHistoryBanner
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright (c) 2024 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.timeline.postprocessor
|
||||
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
|
||||
import io.element.android.libraries.matrix.api.timeline.item.virtual.VirtualTimelineItem
|
||||
import io.element.android.services.toolbox.api.systemclock.SystemClock
|
||||
|
||||
class LoadingIndicatorsPostProcessor(private val systemClock: SystemClock) {
|
||||
|
||||
fun process(
|
||||
items: List<MatrixTimelineItem>,
|
||||
hasMoreToLoadBackwards: Boolean,
|
||||
): List<MatrixTimelineItem> {
|
||||
return if (hasMoreToLoadBackwards && !items.hasEncryptionHistoryBanner()){
|
||||
listOf(
|
||||
MatrixTimelineItem.Virtual(
|
||||
uniqueId = "BackwardLoadingIndicator",
|
||||
virtual = VirtualTimelineItem.LoadingIndicator(
|
||||
backwards = true,
|
||||
timestamp = systemClock.epochMillis()
|
||||
)
|
||||
)
|
||||
) + items
|
||||
}else {
|
||||
items
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -21,17 +21,36 @@ import io.element.android.libraries.matrix.api.timeline.item.event.MembershipCha
|
|||
import io.element.android.libraries.matrix.api.timeline.item.event.OtherState
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.RoomMembershipContent
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.StateContent
|
||||
import io.element.android.libraries.matrix.api.timeline.item.virtual.VirtualTimelineItem
|
||||
|
||||
/**
|
||||
* This timeline post-processor removes the room creation event and the self-join event from the timeline for DMs.
|
||||
* This timeline post-processor removes the room creation event and the self-join event from the timeline for DMs
|
||||
* or add the RoomBeginning item for non DM room.
|
||||
*/
|
||||
class DmBeginningTimelineProcessor {
|
||||
class RoomBeginningPostProcessor {
|
||||
|
||||
fun process(
|
||||
items: List<MatrixTimelineItem>,
|
||||
isDm: Boolean,
|
||||
isAtStartOfTimeline: Boolean
|
||||
hasMoreToLoadBackwards: Boolean
|
||||
): List<MatrixTimelineItem> {
|
||||
if (!isDm || !isAtStartOfTimeline) return items
|
||||
return when {
|
||||
hasMoreToLoadBackwards -> items
|
||||
isDm -> processForDM(items)
|
||||
else -> processForRoom(items)
|
||||
}
|
||||
}
|
||||
|
||||
private fun processForRoom(items: List<MatrixTimelineItem>): List<MatrixTimelineItem> {
|
||||
if (items.hasEncryptionHistoryBanner()) return items
|
||||
val roomBeginningItem = MatrixTimelineItem.Virtual(
|
||||
uniqueId = VirtualTimelineItem.RoomBeginning.toString(),
|
||||
virtual = VirtualTimelineItem.RoomBeginning
|
||||
)
|
||||
return listOf(roomBeginningItem) + items
|
||||
}
|
||||
|
||||
private fun processForDM(items: List<MatrixTimelineItem>): List<MatrixTimelineItem> {
|
||||
|
||||
// Find room creation event. This is usually index 0
|
||||
val roomCreationEventIndex = items.indexOfFirst {
|
||||
|
|
@ -35,8 +35,8 @@ class DmBeginningTimelineProcessorTest {
|
|||
MatrixTimelineItem.Event("m.room.create", anEventTimelineItem(sender = A_USER_ID, content = StateContent("", OtherState.RoomCreate))),
|
||||
MatrixTimelineItem.Event("m.room.member", anEventTimelineItem(content = RoomMembershipContent(A_USER_ID, MembershipChange.JOINED))),
|
||||
)
|
||||
val processor = DmBeginningTimelineProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, isAtStartOfTimeline = true)
|
||||
val processor = RoomBeginningPostProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, hasMoreToLoadBackwards = true)
|
||||
assertThat(processedItems).isEmpty()
|
||||
}
|
||||
|
||||
|
|
@ -52,8 +52,8 @@ class DmBeginningTimelineProcessorTest {
|
|||
MatrixTimelineItem.Event("m.room.member_other", anEventTimelineItem(content = RoomMembershipContent(A_USER_ID_2, MembershipChange.JOINED))),
|
||||
MatrixTimelineItem.Event("m.room.message", anEventTimelineItem(content = aMessageContent("hi"))),
|
||||
)
|
||||
val processor = DmBeginningTimelineProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, isAtStartOfTimeline = true)
|
||||
val processor = RoomBeginningPostProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, hasMoreToLoadBackwards = true)
|
||||
assertThat(processedItems).isEqualTo(expected)
|
||||
}
|
||||
|
||||
|
|
@ -63,8 +63,8 @@ class DmBeginningTimelineProcessorTest {
|
|||
MatrixTimelineItem.Event("m.room.create", anEventTimelineItem(sender = A_USER_ID, content = StateContent("", OtherState.RoomCreate))),
|
||||
MatrixTimelineItem.Event("m.room.member", anEventTimelineItem(content = RoomMembershipContent(A_USER_ID, MembershipChange.JOINED))),
|
||||
)
|
||||
val processor = DmBeginningTimelineProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = false, isAtStartOfTimeline = true)
|
||||
val processor = RoomBeginningPostProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = false, hasMoreToLoadBackwards = true)
|
||||
assertThat(processedItems).isEqualTo(timelineItems)
|
||||
}
|
||||
|
||||
|
|
@ -74,8 +74,8 @@ class DmBeginningTimelineProcessorTest {
|
|||
MatrixTimelineItem.Event("m.room.create", anEventTimelineItem(sender = A_USER_ID, content = StateContent("", OtherState.RoomCreate))),
|
||||
MatrixTimelineItem.Event("m.room.member", anEventTimelineItem(content = RoomMembershipContent(A_USER_ID, MembershipChange.JOINED))),
|
||||
)
|
||||
val processor = DmBeginningTimelineProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, isAtStartOfTimeline = false)
|
||||
val processor = RoomBeginningPostProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, hasMoreToLoadBackwards = false)
|
||||
assertThat(processedItems).isEqualTo(timelineItems)
|
||||
}
|
||||
|
||||
|
|
@ -84,8 +84,8 @@ class DmBeginningTimelineProcessorTest {
|
|||
val timelineItems = listOf(
|
||||
MatrixTimelineItem.Event("m.room.member", anEventTimelineItem(content = RoomMembershipContent(A_USER_ID, MembershipChange.JOINED))),
|
||||
)
|
||||
val processor = DmBeginningTimelineProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, isAtStartOfTimeline = false)
|
||||
val processor = RoomBeginningPostProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, hasMoreToLoadBackwards = false)
|
||||
assertThat(processedItems).isEqualTo(timelineItems)
|
||||
}
|
||||
|
||||
|
|
@ -95,8 +95,8 @@ class DmBeginningTimelineProcessorTest {
|
|||
MatrixTimelineItem.Event("m.room.create", anEventTimelineItem(sender = A_USER_ID, content = StateContent("", OtherState.RoomCreate))),
|
||||
MatrixTimelineItem.Event("m.room.member", anEventTimelineItem(content = RoomMembershipContent(A_USER_ID_2, MembershipChange.JOINED))),
|
||||
)
|
||||
val processor = DmBeginningTimelineProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, isAtStartOfTimeline = false)
|
||||
val processor = RoomBeginningPostProcessor()
|
||||
val processedItems = processor.process(timelineItems, isDm = true, hasMoreToLoadBackwards = false)
|
||||
assertThat(processedItems).isEqualTo(timelineItems)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue