From 331f6928818ee2be42d4be95c471afa4673ede25 Mon Sep 17 00:00:00 2001 From: ganfra Date: Tue, 14 Jan 2025 12:30:57 +0100 Subject: [PATCH] fix(coroutine) : make sure to switch coroutine context as rust sdk is not safe to use on main thread. --- .../libraries/matrix/impl/RustMatrixClient.kt | 10 +- .../matrix/impl/room/RustMatrixRoom.kt | 144 ++++++++++-------- .../matrix/impl/sync/RustSyncService.kt | 42 ++--- .../matrix/impl/timeline/RustTimeline.kt | 8 +- 4 files changed, 118 insertions(+), 86 deletions(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt index 0b7ae4f407..bd85a7f849 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt @@ -137,7 +137,11 @@ class RustMatrixClient( private val innerRoomListService = innerSyncService.roomListService() - private val rustSyncService = RustSyncService(innerSyncService, sessionCoroutineScope) + private val rustSyncService = RustSyncService( + inner = innerSyncService, + dispatcher = sessionDispatcher, + sessionCoroutineScope = sessionCoroutineScope + ) private val pushersService = RustPushersService( client = innerClient, dispatchers = dispatchers, @@ -283,8 +287,8 @@ class RustMatrixClient( } } - override suspend fun findDM(userId: UserId): RoomId? { - return innerClient.getDmRoom(userId.value)?.use { RoomId(it.id()) } + override suspend fun findDM(userId: UserId): RoomId? = withContext(sessionDispatcher) { + innerClient.getDmRoom(userId.value)?.use { RoomId(it.id()) } } override suspend fun ignoreUser(userId: UserId): Result = withContext(sessionDispatcher) { diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt index bffbba5fef..7217b25dcd 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt @@ -329,7 +329,7 @@ class RustMatrixRoom( } } - override suspend fun updateRoomNotificationSettings(): Result = withContext(coroutineDispatchers.io) { + override suspend fun updateRoomNotificationSettings(): Result = withContext(roomDispatcher) { val currentState = _roomNotificationSettingsStateFlow.value val currentRoomNotificationSettings = currentState.roomNotificationSettings() _roomNotificationSettingsStateFlow.value = MatrixRoomNotificationSettingsState.Pending(prevRoomNotificationSettings = currentRoomNotificationSettings) @@ -345,7 +345,7 @@ class RustMatrixRoom( } } - override suspend fun userRole(userId: UserId): Result = withContext(coroutineDispatchers.io) { + override suspend fun userRole(userId: UserId): Result = withContext(roomDispatcher) { runCatching { RoomMemberMapper.mapRole(innerRoom.suggestedRoleForUser(userId.value)) } @@ -429,56 +429,56 @@ class RustMatrixRoom( } } - override suspend fun canUserInvite(userId: UserId): Result { - return runCatching { + override suspend fun canUserInvite(userId: UserId): Result = withContext(roomDispatcher) { + runCatching { innerRoom.canUserInvite(userId.value) } } - override suspend fun canUserKick(userId: UserId): Result { - return runCatching { + override suspend fun canUserKick(userId: UserId): Result = withContext(roomDispatcher) { + runCatching { innerRoom.canUserKick(userId.value) } } - override suspend fun canUserBan(userId: UserId): Result { - return runCatching { + override suspend fun canUserBan(userId: UserId): Result = withContext(roomDispatcher) { + runCatching { innerRoom.canUserBan(userId.value) } } - override suspend fun canUserRedactOwn(userId: UserId): Result { - return runCatching { + override suspend fun canUserRedactOwn(userId: UserId): Result = withContext(roomDispatcher) { + runCatching { innerRoom.canUserRedactOwn(userId.value) } } - override suspend fun canUserRedactOther(userId: UserId): Result { - return runCatching { + override suspend fun canUserRedactOther(userId: UserId): Result = withContext(roomDispatcher) { + runCatching { innerRoom.canUserRedactOther(userId.value) } } - override suspend fun canUserSendState(userId: UserId, type: StateEventType): Result { - return runCatching { + override suspend fun canUserSendState(userId: UserId, type: StateEventType): Result = withContext(roomDispatcher) { + runCatching { innerRoom.canUserSendState(userId.value, type.map()) } } - override suspend fun canUserSendMessage(userId: UserId, type: MessageEventType): Result { - return runCatching { + override suspend fun canUserSendMessage(userId: UserId, type: MessageEventType): Result = withContext(roomDispatcher) { + runCatching { innerRoom.canUserSendMessage(userId.value, type.map()) } } - override suspend fun canUserTriggerRoomNotification(userId: UserId): Result { - return runCatching { + override suspend fun canUserTriggerRoomNotification(userId: UserId): Result = withContext(roomDispatcher) { + runCatching { innerRoom.canUserTriggerRoomNotification(userId.value) } } - override suspend fun canUserPinUnpin(userId: UserId): Result { - return runCatching { + override suspend fun canUserPinUnpin(userId: UserId): Result = withContext(roomDispatcher) { + runCatching { innerRoom.canUserPinUnpin(userId.value) } } @@ -676,8 +676,10 @@ class RustMatrixRoom( return liveTimeline.sendVoiceMessage(file, audioInfo, waveform, progressCallback) } - override suspend fun typingNotice(isTyping: Boolean) = runCatching { - innerRoom.typingNotice(isTyping) + override suspend fun typingNotice(isTyping: Boolean) = withContext(roomDispatcher) { + runCatching { + innerRoom.typingNotice(isTyping) + } } override suspend fun generateWidgetWebViewUrl( @@ -685,32 +687,42 @@ class RustMatrixRoom( clientId: String, languageTag: String?, theme: String?, - ) = runCatching { - widgetSettings.generateWidgetWebViewUrl(innerRoom, clientId, languageTag, theme) + ) = withContext(roomDispatcher) { + runCatching { + widgetSettings.generateWidgetWebViewUrl(innerRoom, clientId, languageTag, theme) + } } - override fun getWidgetDriver(widgetSettings: MatrixWidgetSettings): Result = runCatching { - RustWidgetDriver( - widgetSettings = widgetSettings, - room = innerRoom, - widgetCapabilitiesProvider = object : WidgetCapabilitiesProvider { - override fun acquireCapabilities(capabilities: WidgetCapabilities): WidgetCapabilities { - return getElementCallRequiredPermissions(sessionId.value, deviceId.value) - } - }, - ) + override fun getWidgetDriver(widgetSettings: MatrixWidgetSettings): Result { + return runCatching { + RustWidgetDriver( + widgetSettings = widgetSettings, + room = innerRoom, + widgetCapabilitiesProvider = object : WidgetCapabilitiesProvider { + override fun acquireCapabilities(capabilities: WidgetCapabilities): WidgetCapabilities { + return getElementCallRequiredPermissions(sessionId.value, deviceId.value) + } + }, + ) + } } - override suspend fun getPermalink(): Result = runCatching { - innerRoom.matrixToPermalink() + override suspend fun getPermalink(): Result = withContext(roomDispatcher) { + runCatching { + innerRoom.matrixToPermalink() + } } - override suspend fun getPermalinkFor(eventId: EventId): Result = runCatching { - innerRoom.matrixToEventPermalink(eventId.value) + override suspend fun getPermalinkFor(eventId: EventId): Result = withContext(roomDispatcher) { + runCatching { + innerRoom.matrixToEventPermalink(eventId.value) + } } - override suspend fun sendCallNotificationIfNeeded(): Result = runCatching { - innerRoom.sendCallNotificationIfNeeded() + override suspend fun sendCallNotificationIfNeeded(): Result = withContext(roomDispatcher) { + runCatching { + innerRoom.sendCallNotificationIfNeeded() + } } override suspend fun setSendQueueEnabled(enabled: Boolean) { @@ -722,35 +734,45 @@ class RustMatrixRoom( } } - override suspend fun saveComposerDraft(composerDraft: ComposerDraft): Result = runCatching { - Timber.d("saveComposerDraft: $composerDraft into $roomId") - innerRoom.saveComposerDraft(composerDraft.into()) + override suspend fun saveComposerDraft(composerDraft: ComposerDraft): Result = withContext(roomDispatcher) { + runCatching { + Timber.d("saveComposerDraft: $composerDraft into $roomId") + innerRoom.saveComposerDraft(composerDraft.into()) + } } - override suspend fun loadComposerDraft(): Result = runCatching { - Timber.d("loadComposerDraft for $roomId") - innerRoom.loadComposerDraft()?.into() + override suspend fun loadComposerDraft(): Result = withContext(roomDispatcher) { + runCatching { + Timber.d("loadComposerDraft for $roomId") + innerRoom.loadComposerDraft()?.into() + } } - override suspend fun clearComposerDraft(): Result = runCatching { - Timber.d("clearComposerDraft for $roomId") - innerRoom.clearComposerDraft() + override suspend fun clearComposerDraft(): Result = withContext(roomDispatcher) { + runCatching { + Timber.d("clearComposerDraft for $roomId") + innerRoom.clearComposerDraft() + } } - override suspend fun ignoreDeviceTrustAndResend(devices: Map>, sendHandle: SendHandle) = runCatching { - innerRoom.ignoreDeviceTrustAndResend( - devices = devices.entries.associate { entry -> - entry.key.value to entry.value.map { it.value } - }, - sendHandle = (sendHandle as RustSendHandle).inner, - ) + override suspend fun ignoreDeviceTrustAndResend(devices: Map>, sendHandle: SendHandle) = withContext(roomDispatcher) { + runCatching { + innerRoom.ignoreDeviceTrustAndResend( + devices = devices.entries.associate { entry -> + entry.key.value to entry.value.map { it.value } + }, + sendHandle = (sendHandle as RustSendHandle).inner, + ) + } } - override suspend fun withdrawVerificationAndResend(userIds: List, sendHandle: SendHandle) = runCatching { - innerRoom.withdrawVerificationAndResend( - userIds = userIds.map { it.value }, - sendHandle = (sendHandle as RustSendHandle).inner, - ) + override suspend fun withdrawVerificationAndResend(userIds: List, sendHandle: SendHandle) = withContext(roomDispatcher) { + runCatching { + innerRoom.withdrawVerificationAndResend( + userIds = userIds.map { it.value }, + sendHandle = (sendHandle as RustSendHandle).inner, + ) + } } private fun createTimeline( diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/RustSyncService.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/RustSyncService.kt index 667f571201..dffe6f1b71 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/RustSyncService.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/RustSyncService.kt @@ -9,6 +9,7 @@ package io.element.android.libraries.matrix.impl.sync import io.element.android.libraries.matrix.api.sync.SyncService import io.element.android.libraries.matrix.api.sync.SyncState +import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.flow.SharingStarted @@ -24,31 +25,36 @@ import java.util.concurrent.atomic.AtomicBoolean import org.matrix.rustcomponents.sdk.SyncService as InnerSyncService class RustSyncService( - private val innerSyncService: InnerSyncService, + private val inner: InnerSyncService, + private val dispatcher: CoroutineDispatcher, sessionCoroutineScope: CoroutineScope ) : SyncService { private val isServiceReady = AtomicBoolean(true) - override suspend fun startSync() = runCatching { - if (!isServiceReady.get()) { - Timber.d("Can't start sync: service is not ready") - return@runCatching + override suspend fun startSync() = withContext(dispatcher) { + runCatching { + if (!isServiceReady.get()) { + Timber.d("Can't start sync: service is not ready") + return@runCatching + } + Timber.i("Start sync") + inner.start() + }.onFailure { + Timber.d("Start sync failed: $it") } - Timber.i("Start sync") - innerSyncService.start() - }.onFailure { - Timber.d("Start sync failed: $it") } - override suspend fun stopSync() = runCatching { - if (!isServiceReady.get()) { - Timber.d("Can't stop sync: service is not ready") - return@runCatching + override suspend fun stopSync() = withContext(dispatcher) { + runCatching { + if (!isServiceReady.get()) { + Timber.d("Can't stop sync: service is not ready") + return@runCatching + } + Timber.i("Stop sync") + inner.stop() + }.onFailure { + Timber.d("Stop sync failed: $it") } - Timber.i("Stop sync") - innerSyncService.stop() - }.onFailure { - Timber.d("Stop sync failed: $it") } suspend fun destroy() = withContext(NonCancellable) { @@ -59,7 +65,7 @@ class RustSyncService( } override val syncState: StateFlow = - innerSyncService.stateFlow() + inner.stateFlow() .map(SyncServiceState::toSyncState) .onEach { state -> Timber.i("Sync state=$state") diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt index 9f5ce3a606..7013b4bb78 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt @@ -158,8 +158,8 @@ class RustTimeline( override val membershipChangeEventReceived: Flow = timelineDiffProcessor.membershipChangeEventReceived - override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result { - return runCatching { + override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result = withContext(dispatcher) { + runCatching { inner.sendReadReceipt(receiptType.toRustReceiptType(), eventId.value) } } @@ -590,8 +590,8 @@ class RustTimeline( } } - private suspend fun fetchDetailsForEvent(eventId: EventId): Result { - return runCatching { + private suspend fun fetchDetailsForEvent(eventId: EventId): Result = withContext(dispatcher) { + runCatching { inner.fetchDetailsForEvent(eventId.value) } }