Create SyncOrchestrator (#4176)
* Create `SyncOrchestrator` to centralise the sync start/stop flow through the whole app: the decision is based on several inputs: sync state, network available, app in foreground, app in call, app needing to sync an event for a notification. * Make network monitor return network connectivity status, not internet connectivity * Don't stop the `SyncService` when network connection is lost, let it fail instead. This prevents an issue when using the offline mode of the SDK, which made the wrong UI states to be shown when the `SyncState` is `Idle` (that is, after the service being manually stopped). * Rename `NetworkStatus.Online/Offline` to `Connected/Disconnected` so they're not easily mistaken with internet connectivity instead
This commit is contained in:
parent
ce1c01e626
commit
3c87fb05b2
44 changed files with 851 additions and 344 deletions
|
|
@ -14,9 +14,7 @@ import androidx.compose.runtime.Composable
|
|||
import androidx.compose.runtime.collectAsState
|
||||
import androidx.compose.runtime.getValue
|
||||
import androidx.compose.ui.Modifier
|
||||
import androidx.lifecycle.Lifecycle
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import androidx.lifecycle.repeatOnLifecycle
|
||||
import com.bumble.appyx.core.composable.PermanentChild
|
||||
import com.bumble.appyx.core.lifecycle.subscribe
|
||||
import com.bumble.appyx.core.modality.BuildContext
|
||||
|
|
@ -52,8 +50,6 @@ import io.element.android.features.ftue.api.FtueEntryPoint
|
|||
import io.element.android.features.ftue.api.state.FtueService
|
||||
import io.element.android.features.ftue.api.state.FtueState
|
||||
import io.element.android.features.logout.api.LogoutEntryPoint
|
||||
import io.element.android.features.networkmonitor.api.NetworkMonitor
|
||||
import io.element.android.features.networkmonitor.api.NetworkStatus
|
||||
import io.element.android.features.preferences.api.PreferencesEntryPoint
|
||||
import io.element.android.features.roomdirectory.api.RoomDescription
|
||||
import io.element.android.features.roomdirectory.api.RoomDirectoryEntryPoint
|
||||
|
|
@ -77,18 +73,13 @@ import io.element.android.libraries.matrix.api.core.RoomIdOrAlias
|
|||
import io.element.android.libraries.matrix.api.core.UserId
|
||||
import io.element.android.libraries.matrix.api.core.toRoomIdOrAlias
|
||||
import io.element.android.libraries.matrix.api.permalink.PermalinkData
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import io.element.android.libraries.matrix.api.verification.SessionVerificationRequestDetails
|
||||
import io.element.android.libraries.matrix.api.verification.SessionVerificationServiceListener
|
||||
import io.element.android.libraries.preferences.api.store.EnableNativeSlidingSyncUseCase
|
||||
import io.element.android.services.appnavstate.api.AppNavigationStateService
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.parcelize.Parcelize
|
||||
import timber.log.Timber
|
||||
|
|
@ -107,7 +98,6 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
private val userProfileEntryPoint: UserProfileEntryPoint,
|
||||
private val ftueEntryPoint: FtueEntryPoint,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val networkMonitor: NetworkMonitor,
|
||||
private val ftueService: FtueService,
|
||||
private val roomDirectoryEntryPoint: RoomDirectoryEntryPoint,
|
||||
private val shareEntryPoint: ShareEntryPoint,
|
||||
|
|
@ -133,7 +123,6 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
fun onOpenBugReport()
|
||||
}
|
||||
|
||||
private val syncService = matrixClient.syncService()
|
||||
private val loggedInFlowProcessor = LoggedInEventProcessor(
|
||||
snackbarDispatcher,
|
||||
matrixClient.roomMembershipObserver(),
|
||||
|
|
@ -147,6 +136,7 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
|
||||
override fun onBuilt() {
|
||||
super.onBuilt()
|
||||
|
||||
lifecycle.subscribe(
|
||||
onCreate = {
|
||||
appNavigationStateService.onNavigateToSession(id, matrixClient.sessionId)
|
||||
|
|
@ -165,12 +155,6 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
}
|
||||
.launchIn(lifecycleScope)
|
||||
},
|
||||
onStop = {
|
||||
coroutineScope.launch {
|
||||
// Counterpart startSync is done in observeSyncStateAndNetworkStatus method.
|
||||
syncService.stopSync()
|
||||
}
|
||||
},
|
||||
onDestroy = {
|
||||
appNavigationStateService.onLeavingSpace(id)
|
||||
appNavigationStateService.onLeavingSession(id)
|
||||
|
|
@ -178,7 +162,6 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
matrixClient.sessionVerificationService().setListener(null)
|
||||
}
|
||||
)
|
||||
observeSyncStateAndNetworkStatus()
|
||||
setupSendingQueue()
|
||||
}
|
||||
|
||||
|
|
@ -186,31 +169,6 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
sendingQueue.launchIn(lifecycleScope)
|
||||
}
|
||||
|
||||
@OptIn(FlowPreview::class)
|
||||
private fun observeSyncStateAndNetworkStatus() {
|
||||
lifecycleScope.launch {
|
||||
repeatOnLifecycle(Lifecycle.State.STARTED) {
|
||||
combine(
|
||||
// small debounce to avoid spamming startSync when the state is changing quickly in case of error.
|
||||
syncService.syncState.debounce(100),
|
||||
networkMonitor.connectivity
|
||||
) { syncState, networkStatus ->
|
||||
Pair(syncState, networkStatus)
|
||||
}
|
||||
.onStart {
|
||||
// Temporary fix to ensure that the sync is started even if the networkStatus is offline.
|
||||
syncService.startSync()
|
||||
}
|
||||
.collect { (syncState, networkStatus) ->
|
||||
Timber.d("Sync state: $syncState, network status: $networkStatus")
|
||||
if (syncState != SyncState.Running && networkStatus == NetworkStatus.Online) {
|
||||
syncService.startSync()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sealed interface NavTarget : Parcelable {
|
||||
@Parcelize
|
||||
data object Placeholder : NavTarget
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ import dagger.assisted.Assisted
|
|||
import dagger.assisted.AssistedInject
|
||||
import im.vector.app.features.analytics.plan.JoinedRoom
|
||||
import io.element.android.anvilannotations.ContributesNode
|
||||
import io.element.android.appnav.di.MatrixClientsHolder
|
||||
import io.element.android.appnav.di.MatrixSessionCache
|
||||
import io.element.android.appnav.intent.IntentResolver
|
||||
import io.element.android.appnav.intent.ResolvedIntent
|
||||
import io.element.android.appnav.root.RootNavStateFlowFactory
|
||||
|
|
@ -62,7 +62,7 @@ class RootFlowNode @AssistedInject constructor(
|
|||
@Assisted plugins: List<Plugin>,
|
||||
private val authenticationService: MatrixAuthenticationService,
|
||||
private val navStateFlowFactory: RootNavStateFlowFactory,
|
||||
private val matrixClientsHolder: MatrixClientsHolder,
|
||||
private val matrixSessionCache: MatrixSessionCache,
|
||||
private val presenter: RootPresenter,
|
||||
private val bugReportEntryPoint: BugReportEntryPoint,
|
||||
private val viewFolderEntryPoint: ViewFolderEntryPoint,
|
||||
|
|
@ -78,14 +78,14 @@ class RootFlowNode @AssistedInject constructor(
|
|||
plugins = plugins
|
||||
) {
|
||||
override fun onBuilt() {
|
||||
matrixClientsHolder.restoreWithSavedState(buildContext.savedStateMap)
|
||||
matrixSessionCache.restoreWithSavedState(buildContext.savedStateMap)
|
||||
super.onBuilt()
|
||||
observeNavState()
|
||||
}
|
||||
|
||||
override fun onSaveInstanceState(state: MutableSavedStateMap) {
|
||||
super.onSaveInstanceState(state)
|
||||
matrixClientsHolder.saveIntoSavedState(state)
|
||||
matrixSessionCache.saveIntoSavedState(state)
|
||||
navStateFlowFactory.saveIntoSavedState(state)
|
||||
}
|
||||
|
||||
|
|
@ -118,7 +118,7 @@ class RootFlowNode @AssistedInject constructor(
|
|||
}
|
||||
|
||||
private fun switchToNotLoggedInFlow() {
|
||||
matrixClientsHolder.removeAll()
|
||||
matrixSessionCache.removeAll()
|
||||
backstack.safeRoot(NavTarget.NotLoggedInFlow)
|
||||
}
|
||||
|
||||
|
|
@ -131,7 +131,7 @@ class RootFlowNode @AssistedInject constructor(
|
|||
onFailure: () -> Unit,
|
||||
onSuccess: (SessionId) -> Unit,
|
||||
) {
|
||||
matrixClientsHolder.getOrRestore(sessionId)
|
||||
matrixSessionCache.getOrRestore(sessionId)
|
||||
.onSuccess {
|
||||
Timber.v("Succeed to restore session $sessionId")
|
||||
onSuccess(sessionId)
|
||||
|
|
@ -200,7 +200,7 @@ class RootFlowNode @AssistedInject constructor(
|
|||
override fun resolve(navTarget: NavTarget, buildContext: BuildContext): Node {
|
||||
return when (navTarget) {
|
||||
is NavTarget.LoggedInFlow -> {
|
||||
val matrixClient = matrixClientsHolder.getOrNull(navTarget.sessionId) ?: return splashNode(buildContext).also {
|
||||
val matrixClient = matrixSessionCache.getOrNull(navTarget.sessionId) ?: return splashNode(buildContext).also {
|
||||
Timber.w("Couldn't find any session, go through SplashScreen")
|
||||
}
|
||||
val inputs = LoggedInAppScopeFlowNode.Inputs(matrixClient)
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
package io.element.android.appnav.di
|
||||
|
||||
import androidx.annotation.VisibleForTesting
|
||||
import com.bumble.appyx.core.state.MutableSavedStateMap
|
||||
import com.bumble.appyx.core.state.SavedStateMap
|
||||
import com.squareup.anvil.annotations.ContributesBinding
|
||||
|
|
@ -25,45 +26,61 @@ import javax.inject.Inject
|
|||
|
||||
private const val SAVE_INSTANCE_KEY = "io.element.android.x.di.MatrixClientsHolder.SaveInstanceKey"
|
||||
|
||||
/**
|
||||
* In-memory cache for logged in Matrix sessions.
|
||||
*
|
||||
* This component contains both the [MatrixClient] and the [SyncOrchestrator] for each session.
|
||||
*/
|
||||
@SingleIn(AppScope::class)
|
||||
@ContributesBinding(AppScope::class)
|
||||
class MatrixClientsHolder @Inject constructor(
|
||||
class MatrixSessionCache @Inject constructor(
|
||||
private val authenticationService: MatrixAuthenticationService,
|
||||
private val syncOrchestratorFactory: SyncOrchestrator.Factory,
|
||||
) : MatrixClientProvider {
|
||||
private val sessionIdsToMatrixClient = ConcurrentHashMap<SessionId, MatrixClient>()
|
||||
private val sessionIdsToMatrixSession = ConcurrentHashMap<SessionId, InMemoryMatrixSession>()
|
||||
private val restoreMutex = Mutex()
|
||||
|
||||
init {
|
||||
authenticationService.listenToNewMatrixClients { matrixClient ->
|
||||
sessionIdsToMatrixClient[matrixClient.sessionId] = matrixClient
|
||||
val syncOrchestrator = syncOrchestratorFactory.create(matrixClient)
|
||||
sessionIdsToMatrixSession[matrixClient.sessionId] = InMemoryMatrixSession(
|
||||
matrixClient = matrixClient,
|
||||
syncOrchestrator = syncOrchestrator,
|
||||
)
|
||||
syncOrchestrator.start()
|
||||
}
|
||||
}
|
||||
|
||||
fun removeAll() {
|
||||
sessionIdsToMatrixClient.clear()
|
||||
sessionIdsToMatrixSession.clear()
|
||||
}
|
||||
|
||||
fun remove(sessionId: SessionId) {
|
||||
sessionIdsToMatrixClient.remove(sessionId)
|
||||
sessionIdsToMatrixSession.remove(sessionId)
|
||||
}
|
||||
|
||||
override fun getOrNull(sessionId: SessionId): MatrixClient? {
|
||||
return sessionIdsToMatrixClient[sessionId]
|
||||
return sessionIdsToMatrixSession[sessionId]?.matrixClient
|
||||
}
|
||||
|
||||
override suspend fun getOrRestore(sessionId: SessionId): Result<MatrixClient> {
|
||||
return restoreMutex.withLock {
|
||||
when (val matrixClient = getOrNull(sessionId)) {
|
||||
when (val cached = getOrNull(sessionId)) {
|
||||
null -> restore(sessionId)
|
||||
else -> Result.success(matrixClient)
|
||||
else -> Result.success(cached)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting(otherwise = VisibleForTesting.PRIVATE)
|
||||
internal fun getSyncOrchestrator(sessionId: SessionId): SyncOrchestrator? {
|
||||
return sessionIdsToMatrixSession[sessionId]?.syncOrchestrator
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
fun restoreWithSavedState(state: SavedStateMap?) {
|
||||
Timber.d("Restore state")
|
||||
if (state == null || sessionIdsToMatrixClient.isNotEmpty()) {
|
||||
if (state == null || sessionIdsToMatrixSession.isNotEmpty()) {
|
||||
Timber.w("Restore with non-empty map")
|
||||
return
|
||||
}
|
||||
|
|
@ -79,7 +96,7 @@ class MatrixClientsHolder @Inject constructor(
|
|||
}
|
||||
|
||||
fun saveIntoSavedState(state: MutableSavedStateMap) {
|
||||
val sessionKeys = sessionIdsToMatrixClient.keys.toTypedArray()
|
||||
val sessionKeys = sessionIdsToMatrixSession.keys.toTypedArray()
|
||||
Timber.d("Save matrix session keys = ${sessionKeys.map { it.value }}")
|
||||
state[SAVE_INSTANCE_KEY] = sessionKeys
|
||||
}
|
||||
|
|
@ -88,10 +105,20 @@ class MatrixClientsHolder @Inject constructor(
|
|||
Timber.d("Restore matrix session: $sessionId")
|
||||
return authenticationService.restoreSession(sessionId)
|
||||
.onSuccess { matrixClient ->
|
||||
sessionIdsToMatrixClient[matrixClient.sessionId] = matrixClient
|
||||
val syncOrchestrator = syncOrchestratorFactory.create(matrixClient)
|
||||
sessionIdsToMatrixSession[matrixClient.sessionId] = InMemoryMatrixSession(
|
||||
matrixClient = matrixClient,
|
||||
syncOrchestrator = syncOrchestrator,
|
||||
)
|
||||
syncOrchestrator.start()
|
||||
}
|
||||
.onFailure {
|
||||
Timber.e(it, "Fail to restore session")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private data class InMemoryMatrixSession(
|
||||
val matrixClient: MatrixClient,
|
||||
val syncOrchestrator: SyncOrchestrator,
|
||||
)
|
||||
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Copyright 2025 New Vector Ltd.
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
* Please see LICENSE files in the repository root for full details.
|
||||
*/
|
||||
|
||||
package io.element.android.appnav.di
|
||||
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import io.element.android.features.networkmonitor.api.NetworkMonitor
|
||||
import io.element.android.features.networkmonitor.api.NetworkStatus
|
||||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
|
||||
import io.element.android.libraries.core.coroutine.childScope
|
||||
import io.element.android.libraries.matrix.api.MatrixClient
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import io.element.android.services.appnavstate.api.AppForegroundStateService
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onCompletion
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
class SyncOrchestrator @AssistedInject constructor(
|
||||
@Assisted matrixClient: MatrixClient,
|
||||
private val appForegroundStateService: AppForegroundStateService,
|
||||
private val networkMonitor: NetworkMonitor,
|
||||
dispatchers: CoroutineDispatchers,
|
||||
) {
|
||||
@AssistedFactory
|
||||
interface Factory {
|
||||
fun create(matrixClient: MatrixClient): SyncOrchestrator
|
||||
}
|
||||
|
||||
private val syncService = matrixClient.syncService()
|
||||
|
||||
private val tag = "SyncOrchestrator"
|
||||
|
||||
private val coroutineScope = matrixClient.sessionCoroutineScope.childScope(dispatchers.io, tag)
|
||||
|
||||
private val started = AtomicBoolean(false)
|
||||
|
||||
/**
|
||||
* Starting observing the app state and network state to start/stop the sync service.
|
||||
*
|
||||
* Before observing the state, a first attempt at starting the sync service will happen if it's not already running.
|
||||
*/
|
||||
@OptIn(FlowPreview::class)
|
||||
fun start() {
|
||||
if (!started.compareAndSet(false, true)) {
|
||||
Timber.tag(tag).d("already started, exiting early")
|
||||
return
|
||||
}
|
||||
|
||||
Timber.tag(tag).d("start observing the app and network state")
|
||||
|
||||
combine(
|
||||
// small debounce to avoid spamming startSync when the state is changing quickly in case of error.
|
||||
syncService.syncState.debounce(100.milliseconds),
|
||||
networkMonitor.connectivity,
|
||||
appForegroundStateService.isInForeground,
|
||||
appForegroundStateService.isInCall,
|
||||
appForegroundStateService.isSyncingNotificationEvent,
|
||||
) { syncState, networkState, isInForeground, isInCall, isSyncingNotificationEvent ->
|
||||
val isAppActive = isInForeground || isInCall || isSyncingNotificationEvent
|
||||
val isNetworkAvailable = networkState == NetworkStatus.Connected
|
||||
|
||||
Timber.tag(tag).d("isAppActive=$isAppActive, isNetworkAvailable=$isNetworkAvailable")
|
||||
if (syncState == SyncState.Running && !isAppActive) {
|
||||
SyncStateAction.StopSync
|
||||
} else if (syncState != SyncState.Running && isAppActive && isNetworkAvailable) {
|
||||
SyncStateAction.StartSync
|
||||
} else {
|
||||
SyncStateAction.NoOp
|
||||
}
|
||||
}
|
||||
.distinctUntilChanged()
|
||||
.debounce { action ->
|
||||
// Don't stop the sync immediately, wait a bit to avoid starting/stopping the sync too often
|
||||
if (action == SyncStateAction.StopSync) 3.seconds else 0.seconds
|
||||
}
|
||||
.onEach { action ->
|
||||
when (action) {
|
||||
SyncStateAction.StartSync -> {
|
||||
syncService.startSync()
|
||||
}
|
||||
SyncStateAction.StopSync -> {
|
||||
syncService.stopSync()
|
||||
}
|
||||
SyncStateAction.NoOp -> Unit
|
||||
}
|
||||
}
|
||||
.onCompletion {
|
||||
Timber.tag(tag).d("has been stopped")
|
||||
}
|
||||
.launchIn(coroutineScope)
|
||||
}
|
||||
}
|
||||
|
||||
private enum class SyncStateAction {
|
||||
StartSync,
|
||||
StopSync,
|
||||
NoOp,
|
||||
}
|
||||
|
|
@ -32,7 +32,7 @@ class SendQueues @Inject constructor(
|
|||
) {
|
||||
/**
|
||||
* Launches the send queues retry mechanism in the given [coroutineScope].
|
||||
* Makes sure to re-enable all send queues when the network status is [NetworkStatus.Online].
|
||||
* Makes sure to re-enable all send queues when the network status is [NetworkStatus.Connected].
|
||||
*/
|
||||
@OptIn(FlowPreview::class)
|
||||
fun launchIn(coroutineScope: CoroutineScope) {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ package io.element.android.appnav.root
|
|||
|
||||
import com.bumble.appyx.core.state.MutableSavedStateMap
|
||||
import com.bumble.appyx.core.state.SavedStateMap
|
||||
import io.element.android.appnav.di.MatrixClientsHolder
|
||||
import io.element.android.appnav.di.MatrixSessionCache
|
||||
import io.element.android.features.login.api.LoginUserStory
|
||||
import io.element.android.features.preferences.api.CacheService
|
||||
import io.element.android.libraries.matrix.api.auth.MatrixAuthenticationService
|
||||
|
|
@ -31,7 +31,7 @@ private const val SAVE_INSTANCE_KEY = "io.element.android.x.RootNavStateFlowFact
|
|||
class RootNavStateFlowFactory @Inject constructor(
|
||||
private val authenticationService: MatrixAuthenticationService,
|
||||
private val cacheService: CacheService,
|
||||
private val matrixClientsHolder: MatrixClientsHolder,
|
||||
private val matrixSessionCache: MatrixSessionCache,
|
||||
private val imageLoaderHolder: ImageLoaderHolder,
|
||||
private val loginUserStory: LoginUserStory,
|
||||
private val sessionPreferencesStoreFactory: SessionPreferencesStoreFactory,
|
||||
|
|
@ -63,7 +63,7 @@ class RootNavStateFlowFactory @Inject constructor(
|
|||
val initialCacheIndex = savedStateMap.getCacheIndexOrDefault()
|
||||
return cacheService.clearedCacheEventFlow
|
||||
.onEach { sessionId ->
|
||||
matrixClientsHolder.remove(sessionId)
|
||||
matrixSessionCache.remove(sessionId)
|
||||
// Ensure image loader will be recreated with the new MatrixClient
|
||||
imageLoaderHolder.remove(sessionId)
|
||||
// Also remove cached value for SessionPreferencesStore
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue