client : rename some internals and do not destroy client related instances manually

This commit is contained in:
ganfra 2024-11-21 21:18:56 +01:00
parent 3cb8d4f54f
commit b5e7879d44
7 changed files with 58 additions and 68 deletions

View file

@ -118,44 +118,44 @@ import org.matrix.rustcomponents.sdk.RoomVisibility as RustRoomVisibility
import org.matrix.rustcomponents.sdk.SyncService as ClientSyncService
class RustMatrixClient(
private val client: Client,
private val innerClient: Client,
private val baseDirectory: File,
private val sessionStore: SessionStore,
private val appCoroutineScope: CoroutineScope,
private val sessionDelegate: RustClientSessionDelegate,
syncService: ClientSyncService,
innerSyncService: ClientSyncService,
dispatchers: CoroutineDispatchers,
baseCacheDirectory: File,
clock: SystemClock,
timelineEventTypeFilterFactory: TimelineEventTypeFilterFactory,
featureFlagService: FeatureFlagService,
) : MatrixClient {
override val sessionId: UserId = UserId(client.userId())
override val deviceId: DeviceId = DeviceId(client.deviceId())
override val sessionId: UserId = UserId(innerClient.userId())
override val deviceId: DeviceId = DeviceId(innerClient.deviceId())
override val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, "Session-$sessionId")
private val innerRoomListService = syncService.roomListService()
private val sessionDispatcher = dispatchers.io.limitedParallelism(64)
private val rustSyncService = RustSyncService(syncService, sessionCoroutineScope)
private val innerRoomListService = innerSyncService.roomListService()
private val rustSyncService = RustSyncService(innerSyncService, sessionCoroutineScope)
private val pushersService = RustPushersService(
client = client,
client = innerClient,
dispatchers = dispatchers,
)
private val notificationProcessSetup = NotificationProcessSetup.SingleProcess(syncService)
private val notificationClient = runBlocking { client.notificationClient(notificationProcessSetup) }
private val notificationService = RustNotificationService(notificationClient, dispatchers, clock)
private val notificationSettingsService = RustNotificationSettingsService(client, dispatchers)
private val notificationProcessSetup = NotificationProcessSetup.SingleProcess(innerSyncService)
private val innerNotificationClient = runBlocking { innerClient.notificationClient(notificationProcessSetup) }
private val notificationService = RustNotificationService(innerNotificationClient, dispatchers, clock)
private val notificationSettingsService = RustNotificationSettingsService(innerClient, dispatchers)
.apply { start() }
private val encryptionService = RustEncryptionService(
client = client,
client = innerClient,
syncService = rustSyncService,
sessionCoroutineScope = sessionCoroutineScope,
dispatchers = dispatchers,
)
private val roomDirectoryService = RustRoomDirectoryService(
client = client,
client = innerClient,
sessionDispatcher = sessionDispatcher,
)
@ -175,7 +175,7 @@ class RustMatrixClient(
)
private val verificationService = RustSessionVerificationService(
client = client,
client = innerClient,
isSyncServiceReady = rustSyncService.syncState.map { it == SyncState.Running },
sessionCoroutineScope = sessionCoroutineScope,
)
@ -198,26 +198,26 @@ class RustMatrixClient(
override val mediaLoader: MatrixMediaLoader = RustMediaLoader(
baseCacheDirectory = baseCacheDirectory,
dispatchers = dispatchers,
innerClient = client,
innerClient = innerClient,
)
private val roomMembershipObserver = RoomMembershipObserver()
private var clientDelegateTaskHandle: TaskHandle? = client.setDelegate(sessionDelegate)
private var clientDelegateTaskHandle: TaskHandle? = innerClient.setDelegate(sessionDelegate)
private val _userProfile: MutableStateFlow<MatrixUser> = MutableStateFlow(
MatrixUser(
userId = sessionId,
// TODO cache for displayName?
displayName = null,
avatarUrl = client.cachedAvatarUrl(),
avatarUrl = innerClient.cachedAvatarUrl(),
)
)
override val userProfile: StateFlow<MatrixUser> = _userProfile
override val ignoredUsersFlow = mxCallbackFlow<ImmutableList<UserId>> {
client.subscribeToIgnoredUsers(object : IgnoredUsersListener {
innerClient.subscribeToIgnoredUsers(object : IgnoredUsersListener {
override fun call(ignoredUserIds: List<String>) {
channel.trySend(ignoredUserIds.map(::UserId).toPersistentList())
}
@ -238,7 +238,7 @@ class RustMatrixClient(
override fun userIdServerName(): String {
return runCatching {
client.userIdServerName()
innerClient.userIdServerName()
}
.onFailure {
Timber.w(it, "Failed to get userIdServerName")
@ -249,7 +249,7 @@ class RustMatrixClient(
override suspend fun getUrl(url: String): Result<String> = withContext(sessionDispatcher) {
runCatching {
client.getUrl(url)
innerClient.getUrl(url)
}
}
@ -279,23 +279,23 @@ class RustMatrixClient(
.filter { roomSummary -> roomSummary.info.currentUserMembership == currentUserMembership }
.first()
// Ensure that the room is ready
.also { client.awaitRoomRemoteEcho(it.roomId.value) }
.also { innerClient.awaitRoomRemoteEcho(it.roomId.value) }
}
}
override suspend fun findDM(userId: UserId): RoomId? {
return client.getDmRoom(userId.value)?.use { RoomId(it.id()) }
return innerClient.getDmRoom(userId.value)?.use { RoomId(it.id()) }
}
override suspend fun ignoreUser(userId: UserId): Result<Unit> = withContext(sessionDispatcher) {
runCatching {
client.ignoreUser(userId.value)
innerClient.ignoreUser(userId.value)
}
}
override suspend fun unignoreUser(userId: UserId): Result<Unit> = withContext(sessionDispatcher) {
runCatching {
client.unignoreUser(userId.value)
innerClient.unignoreUser(userId.value)
}
}
@ -338,7 +338,7 @@ class RustMatrixClient(
},
canonicalAlias = createRoomParams.roomAliasName.getOrNull(),
)
val roomId = RoomId(client.createRoom(rustParams))
val roomId = RoomId(innerClient.createRoom(rustParams))
// Wait to receive the room back from the sync but do not returns failure if it fails.
try {
awaitRoom(roomId.toRoomIdOrAlias(), 30.seconds, CurrentUserMembership.JOINED)
@ -363,7 +363,7 @@ class RustMatrixClient(
override suspend fun getProfile(userId: UserId): Result<MatrixUser> = withContext(sessionDispatcher) {
runCatching {
client.getProfile(userId.value).let(UserProfileMapper::map)
innerClient.getProfile(userId.value).let(UserProfileMapper::map)
}
}
@ -373,28 +373,28 @@ class RustMatrixClient(
override suspend fun searchUsers(searchTerm: String, limit: Long): Result<MatrixSearchUserResults> =
withContext(sessionDispatcher) {
runCatching {
client.searchUsers(searchTerm, limit.toULong()).let(UserSearchResultMapper::map)
innerClient.searchUsers(searchTerm, limit.toULong()).let(UserSearchResultMapper::map)
}
}
override suspend fun setDisplayName(displayName: String): Result<Unit> =
withContext(sessionDispatcher) {
runCatching { client.setDisplayName(displayName) }
runCatching { innerClient.setDisplayName(displayName) }
}
override suspend fun uploadAvatar(mimeType: String, data: ByteArray): Result<Unit> =
withContext(sessionDispatcher) {
runCatching { client.uploadAvatar(mimeType, data) }
runCatching { innerClient.uploadAvatar(mimeType, data) }
}
override suspend fun removeAvatar(): Result<Unit> =
withContext(sessionDispatcher) {
runCatching { client.removeAvatar() }
runCatching { innerClient.removeAvatar() }
}
override suspend fun joinRoom(roomId: RoomId): Result<RoomSummary?> = withContext(sessionDispatcher) {
runCatching {
client.joinRoomById(roomId.value).destroy()
innerClient.joinRoomById(roomId.value).destroy()
try {
awaitRoom(roomId.toRoomIdOrAlias(), 10.seconds, CurrentUserMembership.JOINED)
} catch (e: Exception) {
@ -406,7 +406,7 @@ class RustMatrixClient(
override suspend fun joinRoomByIdOrAlias(roomIdOrAlias: RoomIdOrAlias, serverNames: List<String>): Result<RoomSummary?> = withContext(sessionDispatcher) {
runCatching {
client.joinRoomByIdOrAlias(
innerClient.joinRoomByIdOrAlias(
roomIdOrAlias = roomIdOrAlias.identifier,
serverNames = serverNames,
).destroy()
@ -423,7 +423,7 @@ class RustMatrixClient(
sessionDispatcher
) {
runCatching {
client.knock(roomIdOrAlias.identifier, message, serverNames).destroy()
innerClient.knock(roomIdOrAlias.identifier, message, serverNames).destroy()
try {
awaitRoom(roomIdOrAlias, 10.seconds, CurrentUserMembership.KNOCKED)
} catch (e: Exception) {
@ -435,19 +435,19 @@ class RustMatrixClient(
override suspend fun trackRecentlyVisitedRoom(roomId: RoomId): Result<Unit> = withContext(sessionDispatcher) {
runCatching {
client.trackRecentlyVisitedRoom(roomId.value)
innerClient.trackRecentlyVisitedRoom(roomId.value)
}
}
override suspend fun getRecentlyVisitedRooms(): Result<List<RoomId>> = withContext(sessionDispatcher) {
runCatching {
client.getRecentlyVisitedRooms().map(::RoomId)
innerClient.getRecentlyVisitedRooms().map(::RoomId)
}
}
override suspend fun resolveRoomAlias(roomAlias: RoomAlias): Result<Optional<ResolvedRoomAlias>> = withContext(sessionDispatcher) {
runCatching {
val result = client.resolveRoomAlias(roomAlias.value)?.let {
val result = innerClient.resolveRoomAlias(roomAlias.value)?.let {
ResolvedRoomAlias(
roomId = RoomId(it.roomId),
servers = it.servers,
@ -460,8 +460,8 @@ class RustMatrixClient(
override suspend fun getRoomPreviewInfo(roomIdOrAlias: RoomIdOrAlias, serverNames: List<String>): Result<RoomPreviewInfo> = withContext(sessionDispatcher) {
runCatching {
when (roomIdOrAlias) {
is RoomIdOrAlias.Alias -> client.getRoomPreviewFromRoomAlias(roomIdOrAlias.roomAlias.value)
is RoomIdOrAlias.Id -> client.getRoomPreviewFromRoomId(roomIdOrAlias.roomId.value, serverNames)
is RoomIdOrAlias.Alias -> innerClient.getRoomPreviewFromRoomAlias(roomIdOrAlias.roomAlias.value)
is RoomIdOrAlias.Id -> innerClient.getRoomPreviewFromRoomId(roomIdOrAlias.roomId.value, serverNames)
}.use { roomPreview ->
RoomPreviewInfoMapper.map(roomPreview.info())
}
@ -491,11 +491,6 @@ class RustMatrixClient(
clientDelegateTaskHandle?.cancelAndDestroy()
notificationSettingsService.destroy()
verificationService.destroy()
innerRoomListService.destroy()
notificationClient.destroy()
notificationProcessSetup.destroy()
encryptionService.destroy()
client.destroy()
}
override suspend fun getCacheSize(): Long {
@ -515,13 +510,13 @@ class RustMatrixClient(
withContext(sessionDispatcher) {
if (userInitiated) {
try {
result = client.logout()
result = innerClient.logout()
} catch (failure: Throwable) {
if (ignoreSdkError) {
Timber.e(failure, "Fail to call logout on HS. Still delete local files.")
} else {
// If the logout failed we need to restore the delegate
clientDelegateTaskHandle = client.setDelegate(sessionDelegate)
clientDelegateTaskHandle = innerClient.setDelegate(sessionDelegate)
Timber.e(failure, "Fail to call logout on HS.")
throw failure
}
@ -539,7 +534,7 @@ class RustMatrixClient(
override fun canDeactivateAccount(): Boolean {
return runCatching {
client.canDeactivateAccount()
innerClient.canDeactivateAccount()
}
.getOrNull()
.orFalse()
@ -553,7 +548,7 @@ class RustMatrixClient(
runCatching {
// First call without AuthData, should fail
val firstAttempt = runCatching {
client.deactivateAccount(
innerClient.deactivateAccount(
authData = null,
eraseData = eraseData,
)
@ -562,7 +557,7 @@ class RustMatrixClient(
Timber.w(firstAttempt.exceptionOrNull(), "Expected failure, try again")
// This is expected, try again with the password
runCatching {
client.deactivateAccount(
innerClient.deactivateAccount(
authData = AuthData.Password(
passwordDetails = AuthDataPasswordDetails(
identifier = sessionId.value,
@ -574,7 +569,7 @@ class RustMatrixClient(
}.onFailure {
Timber.e(it, "Failed to deactivate account")
// If the deactivation failed we need to restore the delegate
clientDelegateTaskHandle = client.setDelegate(sessionDelegate)
clientDelegateTaskHandle = innerClient.setDelegate(sessionDelegate)
throw it
}
}
@ -589,13 +584,13 @@ class RustMatrixClient(
override suspend fun getAccountManagementUrl(action: AccountManagementAction?): Result<String?> = withContext(sessionDispatcher) {
val rustAction = action?.toRustAction()
runCatching {
client.accountUrl(rustAction)
innerClient.accountUrl(rustAction)
}
}
override suspend fun uploadMedia(mimeType: String, data: ByteArray, progressCallback: ProgressCallback?): Result<String> = withContext(sessionDispatcher) {
runCatching {
client.uploadMedia(mimeType, data, progressCallback?.toProgressWatcher())
innerClient.uploadMedia(mimeType, data, progressCallback?.toProgressWatcher())
}
}
@ -622,13 +617,13 @@ class RustMatrixClient(
withContext(sessionDispatcher) {
Timber.i("setAllSendQueuesEnabled($enabled)")
tryOrNull {
client.enableAllSendQueues(enabled)
innerClient.enableAllSendQueues(enabled)
}
}
}
override fun sendQueueDisabledFlow(): Flow<RoomId> = mxCallbackFlow {
client.subscribeToSendQueueStatus(object : SendQueueRoomErrorListener {
innerClient.subscribeToSendQueueStatus(object : SendQueueRoomErrorListener {
override fun onError(roomId: String, error: ClientException) {
trySend(RoomId(roomId))
}
@ -637,13 +632,13 @@ class RustMatrixClient(
override suspend fun availableSlidingSyncVersions(): Result<List<SlidingSyncVersion>> = withContext(sessionDispatcher) {
runCatching {
client.availableSlidingSyncVersions().map { it.map() }
innerClient.availableSlidingSyncVersions().map { it.map() }
}
}
override suspend fun currentSlidingSyncVersion(): Result<SlidingSyncVersion> = withContext(sessionDispatcher) {
runCatching {
client.session().slidingSyncVersion.map()
innerClient.session().slidingSyncVersion.map()
}
}

View file

@ -77,12 +77,12 @@ class RustMatrixClientFactory @Inject constructor(
.finish()
return RustMatrixClient(
client = client,
innerClient = client,
baseDirectory = baseDirectory,
sessionStore = sessionStore,
appCoroutineScope = appCoroutineScope,
sessionDelegate = sessionDelegate,
syncService = syncService,
innerSyncService = syncService,
dispatchers = coroutineDispatchers,
baseCacheDirectory = cacheDirectory,
clock = clock,

View file

@ -94,10 +94,6 @@ internal class RustEncryptionService(
}
.stateIn(sessionCoroutineScope, SharingStarted.Eagerly, false)
fun destroy() {
service.destroy()
}
override suspend fun enableBackups(): Result<Unit> = withContext(dispatchers.io) {
runCatching {
service.enableBackups()

View file

@ -42,7 +42,6 @@ class RustNotificationSettingsService(
fun destroy() {
notificationSettings.setDelegate(null)
notificationSettings.destroy()
}
override suspend fun getRoomNotificationSettings(roomId: RoomId, isEncrypted: Boolean, isOneToOne: Boolean): Result<RoomNotificationSettings> =

View file

@ -10,12 +10,14 @@ package io.element.android.libraries.matrix.impl.sync
import io.element.android.libraries.matrix.api.sync.SyncService
import io.element.android.libraries.matrix.api.sync.SyncState
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.SyncServiceState
import timber.log.Timber
import java.util.concurrent.atomic.AtomicBoolean
@ -49,12 +51,11 @@ class RustSyncService(
Timber.d("Stop sync failed: $it")
}
suspend fun destroy() {
suspend fun destroy() = withContext(NonCancellable) {
// If the service was still running, stop it
stopSync()
Timber.d("Destroying sync service")
isServiceReady.set(false)
innerSyncService.destroy()
}
override val syncState: StateFlow<SyncState> =

View file

@ -218,7 +218,6 @@ class RustSessionVerificationService(
recoveryStateListenerTaskHandle.cancelAndDestroy()
if (this::verificationController.isInitialized) {
verificationController.setDelegate(null)
verificationController.destroy()
}
}

View file

@ -35,14 +35,14 @@ class RustMatrixClientTest {
private fun TestScope.createRustMatrixClient(
sessionStore: SessionStore = InMemorySessionStore(),
) = RustMatrixClient(
client = FakeRustClient(),
innerClient = FakeRustClient(),
baseDirectory = File(""),
sessionStore = sessionStore,
appCoroutineScope = this,
sessionDelegate = aRustClientSessionDelegate(
sessionStore = sessionStore,
),
syncService = FakeRustSyncService(),
innerSyncService = FakeRustSyncService(),
dispatchers = testCoroutineDispatchers(),
baseCacheDirectory = File(""),
clock = FakeSystemClock(),