diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt index 17e75afad1..1ab185d836 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt @@ -118,44 +118,44 @@ import org.matrix.rustcomponents.sdk.RoomVisibility as RustRoomVisibility import org.matrix.rustcomponents.sdk.SyncService as ClientSyncService class RustMatrixClient( - private val client: Client, + private val innerClient: Client, private val baseDirectory: File, private val sessionStore: SessionStore, private val appCoroutineScope: CoroutineScope, private val sessionDelegate: RustClientSessionDelegate, - syncService: ClientSyncService, + innerSyncService: ClientSyncService, dispatchers: CoroutineDispatchers, baseCacheDirectory: File, clock: SystemClock, timelineEventTypeFilterFactory: TimelineEventTypeFilterFactory, featureFlagService: FeatureFlagService, ) : MatrixClient { - override val sessionId: UserId = UserId(client.userId()) - override val deviceId: DeviceId = DeviceId(client.deviceId()) + override val sessionId: UserId = UserId(innerClient.userId()) + override val deviceId: DeviceId = DeviceId(innerClient.deviceId()) override val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, "Session-$sessionId") - - private val innerRoomListService = syncService.roomListService() private val sessionDispatcher = dispatchers.io.limitedParallelism(64) - private val rustSyncService = RustSyncService(syncService, sessionCoroutineScope) + private val innerRoomListService = innerSyncService.roomListService() + + private val rustSyncService = RustSyncService(innerSyncService, sessionCoroutineScope) private val pushersService = RustPushersService( - client = client, + client = innerClient, dispatchers = dispatchers, ) - private val notificationProcessSetup = NotificationProcessSetup.SingleProcess(syncService) - private val notificationClient = runBlocking { client.notificationClient(notificationProcessSetup) } - private val notificationService = RustNotificationService(notificationClient, dispatchers, clock) - private val notificationSettingsService = RustNotificationSettingsService(client, dispatchers) + private val notificationProcessSetup = NotificationProcessSetup.SingleProcess(innerSyncService) + private val innerNotificationClient = runBlocking { innerClient.notificationClient(notificationProcessSetup) } + private val notificationService = RustNotificationService(innerNotificationClient, dispatchers, clock) + private val notificationSettingsService = RustNotificationSettingsService(innerClient, dispatchers) .apply { start() } private val encryptionService = RustEncryptionService( - client = client, + client = innerClient, syncService = rustSyncService, sessionCoroutineScope = sessionCoroutineScope, dispatchers = dispatchers, ) private val roomDirectoryService = RustRoomDirectoryService( - client = client, + client = innerClient, sessionDispatcher = sessionDispatcher, ) @@ -175,7 +175,7 @@ class RustMatrixClient( ) private val verificationService = RustSessionVerificationService( - client = client, + client = innerClient, isSyncServiceReady = rustSyncService.syncState.map { it == SyncState.Running }, sessionCoroutineScope = sessionCoroutineScope, ) @@ -198,26 +198,26 @@ class RustMatrixClient( override val mediaLoader: MatrixMediaLoader = RustMediaLoader( baseCacheDirectory = baseCacheDirectory, dispatchers = dispatchers, - innerClient = client, + innerClient = innerClient, ) private val roomMembershipObserver = RoomMembershipObserver() - private var clientDelegateTaskHandle: TaskHandle? = client.setDelegate(sessionDelegate) + private var clientDelegateTaskHandle: TaskHandle? = innerClient.setDelegate(sessionDelegate) private val _userProfile: MutableStateFlow = MutableStateFlow( MatrixUser( userId = sessionId, // TODO cache for displayName? displayName = null, - avatarUrl = client.cachedAvatarUrl(), + avatarUrl = innerClient.cachedAvatarUrl(), ) ) override val userProfile: StateFlow = _userProfile override val ignoredUsersFlow = mxCallbackFlow> { - client.subscribeToIgnoredUsers(object : IgnoredUsersListener { + innerClient.subscribeToIgnoredUsers(object : IgnoredUsersListener { override fun call(ignoredUserIds: List) { channel.trySend(ignoredUserIds.map(::UserId).toPersistentList()) } @@ -238,7 +238,7 @@ class RustMatrixClient( override fun userIdServerName(): String { return runCatching { - client.userIdServerName() + innerClient.userIdServerName() } .onFailure { Timber.w(it, "Failed to get userIdServerName") @@ -249,7 +249,7 @@ class RustMatrixClient( override suspend fun getUrl(url: String): Result = withContext(sessionDispatcher) { runCatching { - client.getUrl(url) + innerClient.getUrl(url) } } @@ -279,23 +279,23 @@ class RustMatrixClient( .filter { roomSummary -> roomSummary.info.currentUserMembership == currentUserMembership } .first() // Ensure that the room is ready - .also { client.awaitRoomRemoteEcho(it.roomId.value) } + .also { innerClient.awaitRoomRemoteEcho(it.roomId.value) } } } override suspend fun findDM(userId: UserId): RoomId? { - return client.getDmRoom(userId.value)?.use { RoomId(it.id()) } + return innerClient.getDmRoom(userId.value)?.use { RoomId(it.id()) } } override suspend fun ignoreUser(userId: UserId): Result = withContext(sessionDispatcher) { runCatching { - client.ignoreUser(userId.value) + innerClient.ignoreUser(userId.value) } } override suspend fun unignoreUser(userId: UserId): Result = withContext(sessionDispatcher) { runCatching { - client.unignoreUser(userId.value) + innerClient.unignoreUser(userId.value) } } @@ -338,7 +338,7 @@ class RustMatrixClient( }, canonicalAlias = createRoomParams.roomAliasName.getOrNull(), ) - val roomId = RoomId(client.createRoom(rustParams)) + val roomId = RoomId(innerClient.createRoom(rustParams)) // Wait to receive the room back from the sync but do not returns failure if it fails. try { awaitRoom(roomId.toRoomIdOrAlias(), 30.seconds, CurrentUserMembership.JOINED) @@ -363,7 +363,7 @@ class RustMatrixClient( override suspend fun getProfile(userId: UserId): Result = withContext(sessionDispatcher) { runCatching { - client.getProfile(userId.value).let(UserProfileMapper::map) + innerClient.getProfile(userId.value).let(UserProfileMapper::map) } } @@ -373,28 +373,28 @@ class RustMatrixClient( override suspend fun searchUsers(searchTerm: String, limit: Long): Result = withContext(sessionDispatcher) { runCatching { - client.searchUsers(searchTerm, limit.toULong()).let(UserSearchResultMapper::map) + innerClient.searchUsers(searchTerm, limit.toULong()).let(UserSearchResultMapper::map) } } override suspend fun setDisplayName(displayName: String): Result = withContext(sessionDispatcher) { - runCatching { client.setDisplayName(displayName) } + runCatching { innerClient.setDisplayName(displayName) } } override suspend fun uploadAvatar(mimeType: String, data: ByteArray): Result = withContext(sessionDispatcher) { - runCatching { client.uploadAvatar(mimeType, data) } + runCatching { innerClient.uploadAvatar(mimeType, data) } } override suspend fun removeAvatar(): Result = withContext(sessionDispatcher) { - runCatching { client.removeAvatar() } + runCatching { innerClient.removeAvatar() } } override suspend fun joinRoom(roomId: RoomId): Result = withContext(sessionDispatcher) { runCatching { - client.joinRoomById(roomId.value).destroy() + innerClient.joinRoomById(roomId.value).destroy() try { awaitRoom(roomId.toRoomIdOrAlias(), 10.seconds, CurrentUserMembership.JOINED) } catch (e: Exception) { @@ -406,7 +406,7 @@ class RustMatrixClient( override suspend fun joinRoomByIdOrAlias(roomIdOrAlias: RoomIdOrAlias, serverNames: List): Result = withContext(sessionDispatcher) { runCatching { - client.joinRoomByIdOrAlias( + innerClient.joinRoomByIdOrAlias( roomIdOrAlias = roomIdOrAlias.identifier, serverNames = serverNames, ).destroy() @@ -423,7 +423,7 @@ class RustMatrixClient( sessionDispatcher ) { runCatching { - client.knock(roomIdOrAlias.identifier, message, serverNames).destroy() + innerClient.knock(roomIdOrAlias.identifier, message, serverNames).destroy() try { awaitRoom(roomIdOrAlias, 10.seconds, CurrentUserMembership.KNOCKED) } catch (e: Exception) { @@ -435,19 +435,19 @@ class RustMatrixClient( override suspend fun trackRecentlyVisitedRoom(roomId: RoomId): Result = withContext(sessionDispatcher) { runCatching { - client.trackRecentlyVisitedRoom(roomId.value) + innerClient.trackRecentlyVisitedRoom(roomId.value) } } override suspend fun getRecentlyVisitedRooms(): Result> = withContext(sessionDispatcher) { runCatching { - client.getRecentlyVisitedRooms().map(::RoomId) + innerClient.getRecentlyVisitedRooms().map(::RoomId) } } override suspend fun resolveRoomAlias(roomAlias: RoomAlias): Result> = withContext(sessionDispatcher) { runCatching { - val result = client.resolveRoomAlias(roomAlias.value)?.let { + val result = innerClient.resolveRoomAlias(roomAlias.value)?.let { ResolvedRoomAlias( roomId = RoomId(it.roomId), servers = it.servers, @@ -460,8 +460,8 @@ class RustMatrixClient( override suspend fun getRoomPreviewInfo(roomIdOrAlias: RoomIdOrAlias, serverNames: List): Result = withContext(sessionDispatcher) { runCatching { when (roomIdOrAlias) { - is RoomIdOrAlias.Alias -> client.getRoomPreviewFromRoomAlias(roomIdOrAlias.roomAlias.value) - is RoomIdOrAlias.Id -> client.getRoomPreviewFromRoomId(roomIdOrAlias.roomId.value, serverNames) + is RoomIdOrAlias.Alias -> innerClient.getRoomPreviewFromRoomAlias(roomIdOrAlias.roomAlias.value) + is RoomIdOrAlias.Id -> innerClient.getRoomPreviewFromRoomId(roomIdOrAlias.roomId.value, serverNames) }.use { roomPreview -> RoomPreviewInfoMapper.map(roomPreview.info()) } @@ -491,11 +491,6 @@ class RustMatrixClient( clientDelegateTaskHandle?.cancelAndDestroy() notificationSettingsService.destroy() verificationService.destroy() - innerRoomListService.destroy() - notificationClient.destroy() - notificationProcessSetup.destroy() - encryptionService.destroy() - client.destroy() } override suspend fun getCacheSize(): Long { @@ -515,13 +510,13 @@ class RustMatrixClient( withContext(sessionDispatcher) { if (userInitiated) { try { - result = client.logout() + result = innerClient.logout() } catch (failure: Throwable) { if (ignoreSdkError) { Timber.e(failure, "Fail to call logout on HS. Still delete local files.") } else { // If the logout failed we need to restore the delegate - clientDelegateTaskHandle = client.setDelegate(sessionDelegate) + clientDelegateTaskHandle = innerClient.setDelegate(sessionDelegate) Timber.e(failure, "Fail to call logout on HS.") throw failure } @@ -539,7 +534,7 @@ class RustMatrixClient( override fun canDeactivateAccount(): Boolean { return runCatching { - client.canDeactivateAccount() + innerClient.canDeactivateAccount() } .getOrNull() .orFalse() @@ -553,7 +548,7 @@ class RustMatrixClient( runCatching { // First call without AuthData, should fail val firstAttempt = runCatching { - client.deactivateAccount( + innerClient.deactivateAccount( authData = null, eraseData = eraseData, ) @@ -562,7 +557,7 @@ class RustMatrixClient( Timber.w(firstAttempt.exceptionOrNull(), "Expected failure, try again") // This is expected, try again with the password runCatching { - client.deactivateAccount( + innerClient.deactivateAccount( authData = AuthData.Password( passwordDetails = AuthDataPasswordDetails( identifier = sessionId.value, @@ -574,7 +569,7 @@ class RustMatrixClient( }.onFailure { Timber.e(it, "Failed to deactivate account") // If the deactivation failed we need to restore the delegate - clientDelegateTaskHandle = client.setDelegate(sessionDelegate) + clientDelegateTaskHandle = innerClient.setDelegate(sessionDelegate) throw it } } @@ -589,13 +584,13 @@ class RustMatrixClient( override suspend fun getAccountManagementUrl(action: AccountManagementAction?): Result = withContext(sessionDispatcher) { val rustAction = action?.toRustAction() runCatching { - client.accountUrl(rustAction) + innerClient.accountUrl(rustAction) } } override suspend fun uploadMedia(mimeType: String, data: ByteArray, progressCallback: ProgressCallback?): Result = withContext(sessionDispatcher) { runCatching { - client.uploadMedia(mimeType, data, progressCallback?.toProgressWatcher()) + innerClient.uploadMedia(mimeType, data, progressCallback?.toProgressWatcher()) } } @@ -622,13 +617,13 @@ class RustMatrixClient( withContext(sessionDispatcher) { Timber.i("setAllSendQueuesEnabled($enabled)") tryOrNull { - client.enableAllSendQueues(enabled) + innerClient.enableAllSendQueues(enabled) } } } override fun sendQueueDisabledFlow(): Flow = mxCallbackFlow { - client.subscribeToSendQueueStatus(object : SendQueueRoomErrorListener { + innerClient.subscribeToSendQueueStatus(object : SendQueueRoomErrorListener { override fun onError(roomId: String, error: ClientException) { trySend(RoomId(roomId)) } @@ -637,13 +632,13 @@ class RustMatrixClient( override suspend fun availableSlidingSyncVersions(): Result> = withContext(sessionDispatcher) { runCatching { - client.availableSlidingSyncVersions().map { it.map() } + innerClient.availableSlidingSyncVersions().map { it.map() } } } override suspend fun currentSlidingSyncVersion(): Result = withContext(sessionDispatcher) { runCatching { - client.session().slidingSyncVersion.map() + innerClient.session().slidingSyncVersion.map() } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClientFactory.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClientFactory.kt index e22eda03f2..0ce1d2362b 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClientFactory.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClientFactory.kt @@ -77,12 +77,12 @@ class RustMatrixClientFactory @Inject constructor( .finish() return RustMatrixClient( - client = client, + innerClient = client, baseDirectory = baseDirectory, sessionStore = sessionStore, appCoroutineScope = appCoroutineScope, sessionDelegate = sessionDelegate, - syncService = syncService, + innerSyncService = syncService, dispatchers = coroutineDispatchers, baseCacheDirectory = cacheDirectory, clock = clock, diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/encryption/RustEncryptionService.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/encryption/RustEncryptionService.kt index 31dd6b90a1..f86644231f 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/encryption/RustEncryptionService.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/encryption/RustEncryptionService.kt @@ -94,10 +94,6 @@ internal class RustEncryptionService( } .stateIn(sessionCoroutineScope, SharingStarted.Eagerly, false) - fun destroy() { - service.destroy() - } - override suspend fun enableBackups(): Result = withContext(dispatchers.io) { runCatching { service.enableBackups() diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/notificationsettings/RustNotificationSettingsService.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/notificationsettings/RustNotificationSettingsService.kt index 561a9ec7cb..6c6f2cac4c 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/notificationsettings/RustNotificationSettingsService.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/notificationsettings/RustNotificationSettingsService.kt @@ -42,7 +42,6 @@ class RustNotificationSettingsService( fun destroy() { notificationSettings.setDelegate(null) - notificationSettings.destroy() } override suspend fun getRoomNotificationSettings(roomId: RoomId, isEncrypted: Boolean, isOneToOne: Boolean): Result = diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/RustSyncService.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/RustSyncService.kt index 0a859ee146..a5da538b51 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/RustSyncService.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/RustSyncService.kt @@ -10,12 +10,14 @@ package io.element.android.libraries.matrix.impl.sync import io.element.android.libraries.matrix.api.sync.SyncService import io.element.android.libraries.matrix.api.sync.SyncState import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.stateIn +import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.SyncServiceState import timber.log.Timber import java.util.concurrent.atomic.AtomicBoolean @@ -49,12 +51,11 @@ class RustSyncService( Timber.d("Stop sync failed: $it") } - suspend fun destroy() { + suspend fun destroy() = withContext(NonCancellable) { // If the service was still running, stop it stopSync() Timber.d("Destroying sync service") isServiceReady.set(false) - innerSyncService.destroy() } override val syncState: StateFlow = diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/verification/RustSessionVerificationService.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/verification/RustSessionVerificationService.kt index ed3ddec98b..4a521244db 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/verification/RustSessionVerificationService.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/verification/RustSessionVerificationService.kt @@ -218,7 +218,6 @@ class RustSessionVerificationService( recoveryStateListenerTaskHandle.cancelAndDestroy() if (this::verificationController.isInitialized) { verificationController.setDelegate(null) - verificationController.destroy() } } diff --git a/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClientTest.kt b/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClientTest.kt index 22fcad92bf..d9b21dc4c7 100644 --- a/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClientTest.kt +++ b/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClientTest.kt @@ -35,14 +35,14 @@ class RustMatrixClientTest { private fun TestScope.createRustMatrixClient( sessionStore: SessionStore = InMemorySessionStore(), ) = RustMatrixClient( - client = FakeRustClient(), + innerClient = FakeRustClient(), baseDirectory = File(""), sessionStore = sessionStore, appCoroutineScope = this, sessionDelegate = aRustClientSessionDelegate( sessionStore = sessionStore, ), - syncService = FakeRustSyncService(), + innerSyncService = FakeRustSyncService(), dispatchers = testCoroutineDispatchers(), baseCacheDirectory = File(""), clock = FakeSystemClock(),