Feature/fga/sync states (#1042)

* Change RoomSummaryDataSource to RoomListService to better reflects the rust api

* Better Sync management

* Sync: improve sync spinner rendering

* Sync: make test compiles

* Sync: add more test for sync spinner

* Sync: more clean-up

* Sync: pr review

---------

Co-authored-by: ganfra <francoisg@element.io>
This commit is contained in:
ganfra 2023-08-09 14:37:43 +02:00 committed by GitHub
parent 2131af28d5
commit fa51f6eaa7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 547 additions and 356 deletions

View file

@ -32,8 +32,8 @@ import io.element.android.libraries.matrix.api.notification.NotificationService
import io.element.android.libraries.matrix.api.pusher.PushersService
import io.element.android.libraries.matrix.api.room.MatrixRoom
import io.element.android.libraries.matrix.api.room.RoomMembershipObserver
import io.element.android.libraries.matrix.api.room.RoomSummaryDataSource
import io.element.android.libraries.matrix.api.room.awaitAllRoomsAreLoaded
import io.element.android.libraries.matrix.api.roomlist.RoomListService
import io.element.android.libraries.matrix.api.roomlist.awaitLoaded
import io.element.android.libraries.matrix.api.sync.SyncService
import io.element.android.libraries.matrix.api.sync.SyncState
import io.element.android.libraries.matrix.api.user.MatrixSearchUserResults
@ -45,9 +45,8 @@ import io.element.android.libraries.matrix.impl.notification.RustNotificationSer
import io.element.android.libraries.matrix.impl.pushers.RustPushersService
import io.element.android.libraries.matrix.impl.room.RoomContentForwarder
import io.element.android.libraries.matrix.impl.room.RustMatrixRoom
import io.element.android.libraries.matrix.impl.room.RustRoomSummaryDataSource
import io.element.android.libraries.matrix.impl.room.roomOrNull
import io.element.android.libraries.matrix.impl.room.stateFlow
import io.element.android.libraries.matrix.impl.roomlist.RustRoomListService
import io.element.android.libraries.matrix.impl.roomlist.roomOrNull
import io.element.android.libraries.matrix.impl.sync.RustSyncService
import io.element.android.libraries.matrix.impl.usersearch.UserProfileMapper
import io.element.android.libraries.matrix.impl.usersearch.UserSearchResultMapper
@ -90,11 +89,11 @@ class RustMatrixClient constructor(
) : MatrixClient {
override val sessionId: UserId = UserId(client.userId())
private val roomListService = syncService.roomListService()
private val innerRoomListService = syncService.roomListService()
private val sessionDispatcher = dispatchers.io.limitedParallelism(64)
private val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, "Session-${sessionId}")
private val verificationService = RustSessionVerificationService()
private val rustSyncService = RustSyncService(syncService, roomListService.stateFlow(), sessionCoroutineScope)
private val rustSyncService = RustSyncService(syncService, sessionCoroutineScope)
private val pushersService = RustPushersService(
client = client,
dispatchers = dispatchers,
@ -122,15 +121,15 @@ class RustMatrixClient constructor(
}
}
private val rustRoomSummaryDataSource: RustRoomSummaryDataSource =
RustRoomSummaryDataSource(
roomListService = roomListService,
private val rustRoomListService: RoomListService =
RustRoomListService(
innerRoomListService = innerRoomListService,
sessionCoroutineScope = sessionCoroutineScope,
dispatcher = sessionDispatcher,
)
override val roomSummaryDataSource: RoomSummaryDataSource
get() = rustRoomSummaryDataSource
override val roomListService: RoomListService
get() = rustRoomListService
private val rustMediaLoader = RustMediaLoader(baseCacheDirectory, dispatchers, client)
override val mediaLoader: MatrixMediaLoader
@ -138,7 +137,7 @@ class RustMatrixClient constructor(
private val roomMembershipObserver = RoomMembershipObserver()
private val roomContentForwarder = RoomContentForwarder(roomListService)
private val roomContentForwarder = RoomContentForwarder(innerRoomListService)
init {
client.setDelegate(clientDelegate)
@ -156,7 +155,7 @@ class RustMatrixClient constructor(
var cachedPairOfRoom = pairOfRoom(roomId)
if (cachedPairOfRoom == null) {
//... otherwise, lets wait for the SS to load all rooms and check again.
roomSummaryDataSource.awaitAllRoomsAreLoaded()
roomListService.allRooms().awaitLoaded()
cachedPairOfRoom = pairOfRoom(roomId)
}
cachedPairOfRoom?.let { (roomListItem, fullRoom) ->
@ -174,7 +173,7 @@ class RustMatrixClient constructor(
}
private fun pairOfRoom(roomId: RoomId): Pair<RoomListItem, Room>? {
val cachedRoomListItem = roomListService.roomOrNull(roomId.value)
val cachedRoomListItem = innerRoomListService.roomOrNull(roomId.value)
val fullRoom = cachedRoomListItem?.fullRoom()
return if (cachedRoomListItem == null || fullRoom == null) {
Timber.d("No room cached for $roomId")
@ -225,7 +224,7 @@ class RustMatrixClient constructor(
// Wait to receive the room back from the sync
withTimeout(30_000L) {
roomSummaryDataSource.allRooms()
roomListService.allRooms().summaries
.filter { roomSummaries ->
roomSummaries.map { it.identifier() }.contains(roomId.value)
}
@ -273,7 +272,7 @@ class RustMatrixClient constructor(
client.setDelegate(null)
verificationService.destroy()
syncService.destroy()
roomListService.destroy()
innerRoomListService.destroy()
notificationClient.destroy()
client.destroy()
}

View file

@ -23,7 +23,7 @@ import io.element.android.libraries.di.SessionScope
import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.api.media.MatrixMediaLoader
import io.element.android.libraries.matrix.api.room.RoomMembershipObserver
import io.element.android.libraries.matrix.api.room.RoomSummaryDataSource
import io.element.android.libraries.matrix.api.roomlist.RoomListService
import io.element.android.libraries.matrix.api.verification.SessionVerificationService
@Module
@ -40,8 +40,8 @@ object SessionMatrixModule {
}
@Provides
fun provideRoomSummaryDataSource(matrixClient: MatrixClient): RoomSummaryDataSource {
return matrixClient.roomSummaryDataSource
fun providesRoomListService(matrixClient: MatrixClient): RoomListService {
return matrixClient.roomListService
}
@Provides

View file

@ -20,6 +20,7 @@ import io.element.android.libraries.core.coroutine.parallelMap
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.room.ForwardEventException
import io.element.android.libraries.matrix.impl.roomlist.roomOrNull
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.withTimeout
import org.matrix.rustcomponents.sdk.Room

View file

@ -1,123 +0,0 @@
/*
* Copyright (c) 2023 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.room
import io.element.android.libraries.matrix.api.room.RoomSummary
import io.element.android.libraries.matrix.api.room.RoomSummaryDataSource
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import org.matrix.rustcomponents.sdk.RoomList
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
import org.matrix.rustcomponents.sdk.RoomListException
import org.matrix.rustcomponents.sdk.RoomListInput
import org.matrix.rustcomponents.sdk.RoomListLoadingState
import org.matrix.rustcomponents.sdk.RoomListRange
import org.matrix.rustcomponents.sdk.RoomListService
import org.matrix.rustcomponents.sdk.RoomListServiceState
import timber.log.Timber
internal class RustRoomSummaryDataSource(
private val roomListService: RoomListService,
private val sessionCoroutineScope: CoroutineScope,
dispatcher: CoroutineDispatcher,
roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory(),
) : RoomSummaryDataSource {
private val allRooms = MutableStateFlow<List<RoomSummary>>(emptyList())
private val inviteRooms = MutableStateFlow<List<RoomSummary>>(emptyList())
private val allRoomsLoadingState: MutableStateFlow<RoomSummaryDataSource.LoadingState> = MutableStateFlow(RoomSummaryDataSource.LoadingState.NotLoaded)
private val allRoomsListProcessor = RoomSummaryListProcessor(allRooms, roomListService, roomSummaryDetailsFactory, shouldFetchFullRoom = false)
private val inviteRoomsListProcessor = RoomSummaryListProcessor(inviteRooms, roomListService, roomSummaryDetailsFactory, shouldFetchFullRoom = true)
init {
sessionCoroutineScope.launch(dispatcher) {
val allRooms = roomListService.allRooms()
allRooms
.observeEntriesWithProcessor(allRoomsListProcessor)
.launchIn(this)
allRooms
.loadingStateFlow()
.map { it.toRoomSummaryDataSourceLoadingState() }
.onEach {
allRoomsLoadingState.value = it
}
.launchIn(this)
launch {
// Wait until running, as invites is only available after that
roomListService.stateFlow().first {
it == RoomListServiceState.RUNNING
}
roomListService.invites()
.observeEntriesWithProcessor(inviteRoomsListProcessor)
.launchIn(this)
}
}
}
override fun allRooms(): StateFlow<List<RoomSummary>> {
return allRooms
}
override fun inviteRooms(): StateFlow<List<RoomSummary>> {
return inviteRooms
}
override fun allRoomsLoadingState(): StateFlow<RoomSummaryDataSource.LoadingState> {
return allRoomsLoadingState
}
override fun updateAllRoomsVisibleRange(range: IntRange) {
Timber.v("setVisibleRange=$range")
sessionCoroutineScope.launch {
try {
val ranges = listOf(RoomListRange(range.first.toUInt(), range.last.toUInt()))
roomListService.applyInput(
RoomListInput.Viewport(ranges)
)
} catch (exception: RoomListException) {
Timber.e(exception, "Failed updating visible range")
}
}
}
}
private fun RoomListLoadingState.toRoomSummaryDataSourceLoadingState(): RoomSummaryDataSource.LoadingState {
return when (this) {
is RoomListLoadingState.Loaded -> RoomSummaryDataSource.LoadingState.Loaded(maximumNumberOfRooms?.toInt() ?: 0)
is RoomListLoadingState.NotLoaded -> RoomSummaryDataSource.LoadingState.NotLoaded
}
}
private fun RoomList.observeEntriesWithProcessor(processor: RoomSummaryListProcessor): Flow<List<RoomListEntriesUpdate>> {
return entriesFlow { roomListEntries ->
processor.postEntries(roomListEntries)
}.onEach { update ->
processor.postUpdate(update)
}
}

View file

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.room
package io.element.android.libraries.matrix.impl.roomlist
import io.element.android.libraries.core.data.tryOrNull
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow

View file

@ -14,10 +14,11 @@
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.room
package io.element.android.libraries.matrix.impl.roomlist
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.room.RoomSummaryDetails
import io.element.android.libraries.matrix.api.roomlist.RoomSummaryDetails
import io.element.android.libraries.matrix.impl.room.RoomMemberMapper
import io.element.android.libraries.matrix.impl.room.message.RoomMessageFactory
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.RoomListItem

View file

@ -14,10 +14,10 @@
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.room
package io.element.android.libraries.matrix.impl.roomlist
import io.element.android.libraries.core.coroutine.parallelMap
import io.element.android.libraries.matrix.api.room.RoomSummary
import io.element.android.libraries.matrix.api.roomlist.RoomSummary
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.sync.Mutex

View file

@ -0,0 +1,29 @@
/*
* Copyright (c) 2023 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.roomlist
import io.element.android.libraries.matrix.api.roomlist.RoomList
import io.element.android.libraries.matrix.api.roomlist.RoomSummary
import kotlinx.coroutines.flow.StateFlow
/**
* Simple implementation of [RoomList] where state flows are provided through constructor.
*/
class RustRoomList(
override val summaries: StateFlow<List<RoomSummary>>,
override val loadingState: StateFlow<RoomList.LoadingState>
) : RoomList

View file

@ -0,0 +1,151 @@
/*
* Copyright (c) 2023 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.roomlist
import io.element.android.libraries.matrix.api.roomlist.RoomList
import io.element.android.libraries.matrix.api.roomlist.RoomListService
import io.element.android.libraries.matrix.api.roomlist.RoomSummary
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
import org.matrix.rustcomponents.sdk.RoomListException
import org.matrix.rustcomponents.sdk.RoomListInput
import org.matrix.rustcomponents.sdk.RoomListLoadingState
import org.matrix.rustcomponents.sdk.RoomListRange
import org.matrix.rustcomponents.sdk.RoomListServiceState
import timber.log.Timber
import org.matrix.rustcomponents.sdk.RoomListService as InnerRustRoomListService
class RustRoomListService(
private val innerRoomListService: InnerRustRoomListService,
private val sessionCoroutineScope: CoroutineScope,
dispatcher: CoroutineDispatcher,
roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory(),
) : RoomListService {
private val allRooms = MutableStateFlow<List<RoomSummary>>(emptyList())
private val inviteRooms = MutableStateFlow<List<RoomSummary>>(emptyList())
private val allRoomsLoadingState: MutableStateFlow<RoomList.LoadingState> = MutableStateFlow(RoomList.LoadingState.NotLoaded)
private val allRoomsListProcessor = RoomSummaryListProcessor(allRooms, innerRoomListService, roomSummaryDetailsFactory, shouldFetchFullRoom = false)
private val invitesLoadingState: MutableStateFlow<RoomList.LoadingState> = MutableStateFlow(RoomList.LoadingState.NotLoaded)
private val inviteRoomsListProcessor = RoomSummaryListProcessor(inviteRooms, innerRoomListService, roomSummaryDetailsFactory, shouldFetchFullRoom = true)
init {
sessionCoroutineScope.launch(dispatcher) {
val allRooms = innerRoomListService.allRooms()
allRooms
.observeEntriesWithProcessor(allRoomsListProcessor)
.launchIn(this)
allRooms
.observeLoadingState(allRoomsLoadingState)
.launchIn(this)
launch {
// Wait until running, as invites is only available after that
innerRoomListService.stateFlow().first {
it == RoomListServiceState.RUNNING
}
val invites = innerRoomListService.invites()
invites
.observeEntriesWithProcessor(inviteRoomsListProcessor)
.launchIn(this)
invites
.observeLoadingState(invitesLoadingState)
.launchIn(this)
}
}
}
override fun allRooms(): RoomList {
return RustRoomList(allRooms, allRoomsLoadingState)
}
override fun invites(): RoomList {
return RustRoomList(inviteRooms, invitesLoadingState)
}
override fun updateAllRoomsVisibleRange(range: IntRange) {
Timber.v("setVisibleRange=$range")
sessionCoroutineScope.launch {
try {
val ranges = listOf(RoomListRange(range.first.toUInt(), range.last.toUInt()))
innerRoomListService.applyInput(
RoomListInput.Viewport(ranges)
)
} catch (exception: RoomListException) {
Timber.e(exception, "Failed updating visible range")
}
}
}
override val state: StateFlow<RoomListService.State> =
innerRoomListService.stateFlow()
.map { it.toRoomListState() }
.onEach { state ->
Timber.d("RoomList state=$state")
}
.distinctUntilChanged()
.stateIn(sessionCoroutineScope, SharingStarted.Eagerly, RoomListService.State.Idle)
}
private fun RoomListLoadingState.toLoadingState(): RoomList.LoadingState {
return when (this) {
is RoomListLoadingState.Loaded -> RoomList.LoadingState.Loaded(maximumNumberOfRooms?.toInt() ?: 0)
RoomListLoadingState.NotLoaded -> RoomList.LoadingState.NotLoaded
}
}
private fun RoomListServiceState.toRoomListState(): RoomListService.State {
return when (this) {
RoomListServiceState.INIT,
RoomListServiceState.SETTING_UP -> RoomListService.State.Idle
RoomListServiceState.RUNNING -> RoomListService.State.Running
RoomListServiceState.ERROR -> RoomListService.State.Error
RoomListServiceState.TERMINATED -> RoomListService.State.Terminated
}
}
private fun org.matrix.rustcomponents.sdk.RoomList.observeEntriesWithProcessor(processor: RoomSummaryListProcessor): Flow<List<RoomListEntriesUpdate>> {
return entriesFlow { roomListEntries ->
processor.postEntries(roomListEntries)
}.onEach { update ->
processor.postUpdate(update)
}
}
private fun org.matrix.rustcomponents.sdk.RoomList.observeLoadingState(stateFlow: MutableStateFlow<RoomList.LoadingState>): Flow<RoomList.LoadingState> {
return loadingStateFlow()
.map { it.toLoadingState() }
.onEach {
stateFlow.value = it
}
}

View file

@ -17,19 +17,8 @@
package io.element.android.libraries.matrix.impl.sync
import io.element.android.libraries.matrix.api.sync.SyncState
import org.matrix.rustcomponents.sdk.RoomListServiceState
import org.matrix.rustcomponents.sdk.SyncServiceState
internal fun RoomListServiceState.toSyncState(): SyncState {
return when (this) {
RoomListServiceState.INIT,
RoomListServiceState.SETTING_UP -> SyncState.Idle
RoomListServiceState.RUNNING -> SyncState.Running
RoomListServiceState.ERROR -> SyncState.Error
RoomListServiceState.TERMINATED -> SyncState.Terminated
}
}
internal fun SyncServiceState.toSyncState(): SyncState {
return when (this) {
SyncServiceState.IDLE -> SyncState.Idle

View file

@ -19,36 +19,38 @@ package io.element.android.libraries.matrix.impl.sync
import io.element.android.libraries.matrix.api.sync.SyncService
import io.element.android.libraries.matrix.api.sync.SyncState
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.stateIn
import org.matrix.rustcomponents.sdk.RoomListServiceState
import org.matrix.rustcomponents.sdk.SyncServiceInterface
import org.matrix.rustcomponents.sdk.SyncServiceState
import timber.log.Timber
class RustSyncService(
private val innerSyncService: SyncServiceInterface,
roomListStateFlow: Flow<RoomListServiceState>,
sessionCoroutineScope: CoroutineScope
) : SyncService {
override suspend fun startSync() = runCatching {
Timber.i("Start sync")
innerSyncService.start()
}.onFailure {
Timber.d("Start sync failed: $it")
}
override fun stopSync() = runCatching {
Timber.i("Stop sync")
innerSyncService.pause()
}.onFailure {
Timber.d("Stop sync failed: $it")
}
override val syncState: StateFlow<SyncState> =
roomListStateFlow
.map(RoomListServiceState::toSyncState)
innerSyncService.stateFlow()
.map(SyncServiceState::toSyncState)
.onEach { state ->
Timber.i("Sync state=$state")
}

View file

@ -22,11 +22,11 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import org.matrix.rustcomponents.sdk.SyncService
import org.matrix.rustcomponents.sdk.SyncServiceInterface
import org.matrix.rustcomponents.sdk.SyncServiceState
import org.matrix.rustcomponents.sdk.SyncServiceStateObserver
fun SyncService.stateFlow(): Flow<SyncServiceState> =
fun SyncServiceInterface.stateFlow(): Flow<SyncServiceState> =
mxCallbackFlow {
val listener = object : SyncServiceStateObserver {
override fun onUpdate(state: SyncServiceState) {