diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt index e642813e3b..521d26f062 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt @@ -138,7 +138,7 @@ class RustMatrixClient( syncService = rustSyncService, sessionCoroutineScope = sessionCoroutineScope, dispatchers = dispatchers, - ).apply { start() } + ) private val sessionDirectoryNameProvider = SessionDirectoryNameProvider() private val isLoggingOut = AtomicBoolean(false) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/encryption/RustEncryptionService.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/encryption/RustEncryptionService.kt index 84e9fb2f14..fda22122fd 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/encryption/RustEncryptionService.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/encryption/RustEncryptionService.kt @@ -25,7 +25,7 @@ import io.element.android.libraries.matrix.api.encryption.EncryptionService import io.element.android.libraries.matrix.api.encryption.RecoveryState import io.element.android.libraries.matrix.api.sync.SyncState import io.element.android.libraries.matrix.impl.sync.RustSyncService -import io.element.android.libraries.matrix.impl.util.cancelAndDestroy +import io.element.android.libraries.matrix.impl.util.mxCallbackFlow import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.currentCoroutineContext @@ -46,7 +46,6 @@ import org.matrix.rustcomponents.sdk.Client import org.matrix.rustcomponents.sdk.EnableRecoveryProgressListener import org.matrix.rustcomponents.sdk.Encryption import org.matrix.rustcomponents.sdk.RecoveryStateListener -import org.matrix.rustcomponents.sdk.TaskHandle import org.matrix.rustcomponents.sdk.BackupState as RustBackupState import org.matrix.rustcomponents.sdk.BackupUploadState as RustBackupUploadState import org.matrix.rustcomponents.sdk.EnableRecoveryProgress as RustEnableRecoveryProgress @@ -66,10 +65,15 @@ internal class RustEncryptionService( private val enableRecoveryProgressMapper = EnableRecoveryProgressMapper() private val backupUploadStateMapper = BackupUploadStateMapper() private val steadyStateExceptionMapper = SteadyStateExceptionMapper() - private var backupStateListenerTaskHandle: TaskHandle? = null - private var recoveryStateListenerTaskHandle: TaskHandle? = null - private val backupStateFlow = MutableStateFlow(service.backupState().let(backupStateMapper::map)) + private val backupStateFlow = mxCallbackFlow { + val listener = object : BackupStateListener { + override fun onUpdate(status: RustBackupState) { + trySend(backupStateMapper.map(status)) + } + } + service.backupStateListener(listener) + }.stateIn(sessionCoroutineScope, SharingStarted.Eagerly, service.backupState().let(backupStateMapper::map)) override val backupStateStateFlow = combine( backupStateFlow, @@ -82,7 +86,14 @@ internal class RustEncryptionService( } }.stateIn(sessionCoroutineScope, SharingStarted.Eagerly, BackupState.WAITING_FOR_SYNC) - private val recoveryStateFlow: MutableStateFlow = MutableStateFlow(service.recoveryState().let(recoveryStateMapper::map)) + private val recoveryStateFlow = mxCallbackFlow { + val listener = object : RecoveryStateListener { + override fun onUpdate(status: RustRecoveryState) { + trySend(recoveryStateMapper.map(status)) + } + } + service.recoveryStateListener(listener) + }.stateIn(sessionCoroutineScope, SharingStarted.Eagerly, service.recoveryState().let(recoveryStateMapper::map)) override val recoveryStateStateFlow = combine( recoveryStateFlow, @@ -111,23 +122,7 @@ internal class RustEncryptionService( } .stateIn(sessionCoroutineScope, SharingStarted.Eagerly, false) - fun start() { - backupStateListenerTaskHandle = service.backupStateListener(object : BackupStateListener { - override fun onUpdate(status: RustBackupState) { - backupStateFlow.value = backupStateMapper.map(status) - } - }) - - recoveryStateListenerTaskHandle = service.recoveryStateListener(object : RecoveryStateListener { - override fun onUpdate(status: RustRecoveryState) { - recoveryStateFlow.value = recoveryStateMapper.map(status) - } - }) - } - fun destroy() { - backupStateListenerTaskHandle?.cancelAndDestroy() - recoveryStateListenerTaskHandle?.cancelAndDestroy() service.destroy() }