Use mxCallbackFlow

This commit is contained in:
Benoit Marty 2024-02-22 09:11:08 +01:00
parent bd8b234f59
commit 26ea71193f
2 changed files with 18 additions and 23 deletions

View file

@ -138,7 +138,7 @@ class RustMatrixClient(
syncService = rustSyncService,
sessionCoroutineScope = sessionCoroutineScope,
dispatchers = dispatchers,
).apply { start() }
)
private val sessionDirectoryNameProvider = SessionDirectoryNameProvider()
private val isLoggingOut = AtomicBoolean(false)

View file

@ -25,7 +25,7 @@ import io.element.android.libraries.matrix.api.encryption.EncryptionService
import io.element.android.libraries.matrix.api.encryption.RecoveryState
import io.element.android.libraries.matrix.api.sync.SyncState
import io.element.android.libraries.matrix.impl.sync.RustSyncService
import io.element.android.libraries.matrix.impl.util.cancelAndDestroy
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.currentCoroutineContext
@ -46,7 +46,6 @@ import org.matrix.rustcomponents.sdk.Client
import org.matrix.rustcomponents.sdk.EnableRecoveryProgressListener
import org.matrix.rustcomponents.sdk.Encryption
import org.matrix.rustcomponents.sdk.RecoveryStateListener
import org.matrix.rustcomponents.sdk.TaskHandle
import org.matrix.rustcomponents.sdk.BackupState as RustBackupState
import org.matrix.rustcomponents.sdk.BackupUploadState as RustBackupUploadState
import org.matrix.rustcomponents.sdk.EnableRecoveryProgress as RustEnableRecoveryProgress
@ -66,10 +65,15 @@ internal class RustEncryptionService(
private val enableRecoveryProgressMapper = EnableRecoveryProgressMapper()
private val backupUploadStateMapper = BackupUploadStateMapper()
private val steadyStateExceptionMapper = SteadyStateExceptionMapper()
private var backupStateListenerTaskHandle: TaskHandle? = null
private var recoveryStateListenerTaskHandle: TaskHandle? = null
private val backupStateFlow = MutableStateFlow(service.backupState().let(backupStateMapper::map))
private val backupStateFlow = mxCallbackFlow {
val listener = object : BackupStateListener {
override fun onUpdate(status: RustBackupState) {
trySend(backupStateMapper.map(status))
}
}
service.backupStateListener(listener)
}.stateIn(sessionCoroutineScope, SharingStarted.Eagerly, service.backupState().let(backupStateMapper::map))
override val backupStateStateFlow = combine(
backupStateFlow,
@ -82,7 +86,14 @@ internal class RustEncryptionService(
}
}.stateIn(sessionCoroutineScope, SharingStarted.Eagerly, BackupState.WAITING_FOR_SYNC)
private val recoveryStateFlow: MutableStateFlow<RecoveryState> = MutableStateFlow(service.recoveryState().let(recoveryStateMapper::map))
private val recoveryStateFlow = mxCallbackFlow {
val listener = object : RecoveryStateListener {
override fun onUpdate(status: RustRecoveryState) {
trySend(recoveryStateMapper.map(status))
}
}
service.recoveryStateListener(listener)
}.stateIn(sessionCoroutineScope, SharingStarted.Eagerly, service.recoveryState().let(recoveryStateMapper::map))
override val recoveryStateStateFlow = combine(
recoveryStateFlow,
@ -111,23 +122,7 @@ internal class RustEncryptionService(
}
.stateIn(sessionCoroutineScope, SharingStarted.Eagerly, false)
fun start() {
backupStateListenerTaskHandle = service.backupStateListener(object : BackupStateListener {
override fun onUpdate(status: RustBackupState) {
backupStateFlow.value = backupStateMapper.map(status)
}
})
recoveryStateListenerTaskHandle = service.recoveryStateListener(object : RecoveryStateListener {
override fun onUpdate(status: RustRecoveryState) {
recoveryStateFlow.value = recoveryStateMapper.map(status)
}
})
}
fun destroy() {
backupStateListenerTaskHandle?.cancelAndDestroy()
recoveryStateListenerTaskHandle?.cancelAndDestroy()
service.destroy()
}