PollHistory : simplify so we only have one Node. Also enrich PollHistoryState.

This commit is contained in:
ganfra 2023-12-06 19:27:50 +01:00
parent 4a2cbb1ed4
commit aa9693126f
19 changed files with 376 additions and 255 deletions

View file

@ -41,6 +41,7 @@ import io.element.android.libraries.matrix.api.room.StateEventType
import io.element.android.libraries.matrix.api.room.location.AssetType
import io.element.android.libraries.matrix.api.room.roomMembers
import io.element.android.libraries.matrix.api.room.roomNotificationSettings
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
import io.element.android.libraries.matrix.api.widget.MatrixWidgetDriver
import io.element.android.libraries.matrix.api.widget.MatrixWidgetSettings
import io.element.android.libraries.matrix.impl.core.toProgressWatcher
@ -50,6 +51,7 @@ import io.element.android.libraries.matrix.impl.media.toMSC3246range
import io.element.android.libraries.matrix.impl.notificationsettings.RustNotificationSettingsService
import io.element.android.libraries.matrix.impl.poll.toInner
import io.element.android.libraries.matrix.impl.room.location.toInner
import io.element.android.libraries.matrix.impl.timeline.AsyncMatrixTimeline
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
import io.element.android.libraries.matrix.impl.util.destroyAll
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
@ -70,14 +72,12 @@ import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.EventTimelineItem
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.RoomInfo
import org.matrix.rustcomponents.sdk.RoomInfoListener
import org.matrix.rustcomponents.sdk.RoomListItem
import org.matrix.rustcomponents.sdk.RoomMember
import org.matrix.rustcomponents.sdk.RoomMessageEventContentWithoutRelation
import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle
import org.matrix.rustcomponents.sdk.Timeline
import org.matrix.rustcomponents.sdk.WidgetCapabilities
import org.matrix.rustcomponents.sdk.WidgetCapabilitiesProvider
import org.matrix.rustcomponents.sdk.messageEventContentFromHtml
@ -85,14 +85,16 @@ import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
import org.matrix.rustcomponents.sdk.use
import timber.log.Timber
import java.io.File
import org.matrix.rustcomponents.sdk.Room as InnerRoom
import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline
@OptIn(ExperimentalCoroutinesApi::class)
class RustMatrixRoom(
override val sessionId: SessionId,
private val isKeyBackupEnabled: Boolean,
private val roomListItem: RoomListItem,
private val innerRoom: Room,
private val innerTimeline: Timeline,
private val innerRoom: InnerRoom,
private val innerTimeline: InnerTimeline,
private val roomNotificationSettingsService: RustNotificationSettingsService,
sessionCoroutineScope: CoroutineScope,
private val coroutineDispatchers: CoroutineDispatchers,
@ -130,15 +132,9 @@ class RustMatrixRoom(
private val _roomNotificationSettingsStateFlow = MutableStateFlow<MatrixRoomNotificationSettingsState>(MatrixRoomNotificationSettingsState.Unknown)
override val roomNotificationSettingsStateFlow: StateFlow<MatrixRoomNotificationSettingsState> = _roomNotificationSettingsStateFlow
override val timeline = RustMatrixTimeline(
isKeyBackupEnabled = isKeyBackupEnabled,
matrixRoom = this,
innerTimeline = innerTimeline,
roomCoroutineScope = roomCoroutineScope,
dispatcher = roomDispatcher,
lastLoginTimestamp = sessionData.loginTimestamp,
onNewSyncedEvent = { _syncUpdateFlow.value = systemClock.epochMillis() }
)
override val timeline = createMatrixTimeline(innerTimeline) {
_syncUpdateFlow.value = systemClock.epochMillis()
}
override val membersStateFlow: StateFlow<MatrixRoomMembersState> = _membersStateFlow.asStateFlow()
@ -150,7 +146,7 @@ class RustMatrixRoom(
override fun destroy() {
roomCoroutineScope.cancel()
innerTimeline.destroy()
timeline.close()
innerRoom.destroy()
roomListItem.destroy()
specialModeEventTimelineItem?.destroy()
@ -570,22 +566,35 @@ class RustMatrixRoom(
)
}
override suspend fun pollHistory() = RustMatrixTimeline(
isKeyBackupEnabled = isKeyBackupEnabled,
matrixRoom = this,
innerTimeline = innerRoom.pollHistory(),
roomCoroutineScope = roomCoroutineScope,
dispatcher = roomDispatcher,
lastLoginTimestamp = sessionData.loginTimestamp,
onNewSyncedEvent = { _syncUpdateFlow.value = systemClock.epochMillis() }
)
override fun pollHistory() = AsyncMatrixTimeline(
coroutineScope = roomCoroutineScope,
dispatcher = roomDispatcher
) {
val innerTimeline = innerRoom.pollHistory()
createMatrixTimeline(innerTimeline)
}
private suspend fun sendAttachment(files: List<File>, handle: () -> SendAttachmentJoinHandle): Result<MediaUploadHandler> {
private fun sendAttachment(files: List<File>, handle: () -> SendAttachmentJoinHandle): Result<MediaUploadHandler> {
return runCatching {
MediaUploadHandlerImpl(files, handle())
}
}
private fun createMatrixTimeline(
timeline: InnerTimeline,
onNewSyncedEvent: () -> Unit = {},
): MatrixTimeline {
return RustMatrixTimeline(
isKeyBackupEnabled = isKeyBackupEnabled,
matrixRoom = this,
roomCoroutineScope = roomCoroutineScope,
dispatcher = roomDispatcher,
lastLoginTimestamp = sessionData.loginTimestamp,
onNewSyncedEvent = onNewSyncedEvent,
innerTimeline = timeline,
)
}
private fun messageEventContentFromParts(body: String, htmlBody: String?): RoomMessageEventContentWithoutRelation =
if (htmlBody != null) {
messageEventContentFromHtml(body, htmlBody)

View file

@ -0,0 +1,95 @@
/*
* Copyright (c) 2023 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.timeline
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import timber.log.Timber
/**
* This class is a wrapper around a [MatrixTimeline] that will be created asynchronously.
*/
class AsyncMatrixTimeline(
coroutineScope: CoroutineScope,
dispatcher: CoroutineDispatcher,
private val timelineProvider: suspend () -> MatrixTimeline
) : MatrixTimeline {
private val _timelineItems: MutableStateFlow<List<MatrixTimelineItem>> =
MutableStateFlow(emptyList())
private val _paginationState = MutableStateFlow(
MatrixTimeline.PaginationState()
)
private val timeline = coroutineScope.async(context = dispatcher, start = CoroutineStart.LAZY) {
timelineProvider()
}
private val closeSignal = CompletableDeferred<Unit>()
init {
coroutineScope.launch {
val delegateTimeline = timeline.await()
delegateTimeline.timelineItems
.onEach { _timelineItems.value = it }
.launchIn(this)
delegateTimeline.paginationState
.onEach { _paginationState.value = it }
.launchIn(this)
launch {
withContext(NonCancellable) {
closeSignal.await()
Timber.d("Close delegate")
delegateTimeline.close()
}
}
}
}
override val paginationState: StateFlow<MatrixTimeline.PaginationState> = _paginationState
override val timelineItems: Flow<List<MatrixTimelineItem>> = _timelineItems
override suspend fun paginateBackwards(requestSize: Int, untilNumberOfItems: Int): Result<Unit> {
return timeline.await().paginateBackwards(requestSize, untilNumberOfItems)
}
override suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
return timeline.await().fetchDetailsForEvent(eventId)
}
override suspend fun sendReadReceipt(eventId: EventId): Result<Unit> {
return timeline.await().sendReadReceipt(eventId)
}
override fun close() {
closeSignal.complete(Unit)
}
}

View file

@ -72,11 +72,7 @@ class RustMatrixTimeline(
MutableStateFlow(emptyList())
private val _paginationState = MutableStateFlow(
MatrixTimeline.PaginationState(
hasMoreToLoadBackwards = true,
isBackPaginating = false,
beginningOfRoomReached = false,
)
MatrixTimeline.PaginationState()
)
private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor(