fix(coroutine) : make sure to switch coroutine context as rust sdk is not safe to use on main thread.

This commit is contained in:
ganfra 2025-01-14 12:30:57 +01:00
parent c7dbc7ad6c
commit 331f692881
4 changed files with 118 additions and 86 deletions

View file

@ -137,7 +137,11 @@ class RustMatrixClient(
private val innerRoomListService = innerSyncService.roomListService()
private val rustSyncService = RustSyncService(innerSyncService, sessionCoroutineScope)
private val rustSyncService = RustSyncService(
inner = innerSyncService,
dispatcher = sessionDispatcher,
sessionCoroutineScope = sessionCoroutineScope
)
private val pushersService = RustPushersService(
client = innerClient,
dispatchers = dispatchers,
@ -283,8 +287,8 @@ class RustMatrixClient(
}
}
override suspend fun findDM(userId: UserId): RoomId? {
return innerClient.getDmRoom(userId.value)?.use { RoomId(it.id()) }
override suspend fun findDM(userId: UserId): RoomId? = withContext(sessionDispatcher) {
innerClient.getDmRoom(userId.value)?.use { RoomId(it.id()) }
}
override suspend fun ignoreUser(userId: UserId): Result<Unit> = withContext(sessionDispatcher) {

View file

@ -329,7 +329,7 @@ class RustMatrixRoom(
}
}
override suspend fun updateRoomNotificationSettings(): Result<Unit> = withContext(coroutineDispatchers.io) {
override suspend fun updateRoomNotificationSettings(): Result<Unit> = withContext(roomDispatcher) {
val currentState = _roomNotificationSettingsStateFlow.value
val currentRoomNotificationSettings = currentState.roomNotificationSettings()
_roomNotificationSettingsStateFlow.value = MatrixRoomNotificationSettingsState.Pending(prevRoomNotificationSettings = currentRoomNotificationSettings)
@ -345,7 +345,7 @@ class RustMatrixRoom(
}
}
override suspend fun userRole(userId: UserId): Result<RoomMember.Role> = withContext(coroutineDispatchers.io) {
override suspend fun userRole(userId: UserId): Result<RoomMember.Role> = withContext(roomDispatcher) {
runCatching {
RoomMemberMapper.mapRole(innerRoom.suggestedRoleForUser(userId.value))
}
@ -429,56 +429,56 @@ class RustMatrixRoom(
}
}
override suspend fun canUserInvite(userId: UserId): Result<Boolean> {
return runCatching {
override suspend fun canUserInvite(userId: UserId): Result<Boolean> = withContext(roomDispatcher) {
runCatching {
innerRoom.canUserInvite(userId.value)
}
}
override suspend fun canUserKick(userId: UserId): Result<Boolean> {
return runCatching {
override suspend fun canUserKick(userId: UserId): Result<Boolean> = withContext(roomDispatcher) {
runCatching {
innerRoom.canUserKick(userId.value)
}
}
override suspend fun canUserBan(userId: UserId): Result<Boolean> {
return runCatching {
override suspend fun canUserBan(userId: UserId): Result<Boolean> = withContext(roomDispatcher) {
runCatching {
innerRoom.canUserBan(userId.value)
}
}
override suspend fun canUserRedactOwn(userId: UserId): Result<Boolean> {
return runCatching {
override suspend fun canUserRedactOwn(userId: UserId): Result<Boolean> = withContext(roomDispatcher) {
runCatching {
innerRoom.canUserRedactOwn(userId.value)
}
}
override suspend fun canUserRedactOther(userId: UserId): Result<Boolean> {
return runCatching {
override suspend fun canUserRedactOther(userId: UserId): Result<Boolean> = withContext(roomDispatcher) {
runCatching {
innerRoom.canUserRedactOther(userId.value)
}
}
override suspend fun canUserSendState(userId: UserId, type: StateEventType): Result<Boolean> {
return runCatching {
override suspend fun canUserSendState(userId: UserId, type: StateEventType): Result<Boolean> = withContext(roomDispatcher) {
runCatching {
innerRoom.canUserSendState(userId.value, type.map())
}
}
override suspend fun canUserSendMessage(userId: UserId, type: MessageEventType): Result<Boolean> {
return runCatching {
override suspend fun canUserSendMessage(userId: UserId, type: MessageEventType): Result<Boolean> = withContext(roomDispatcher) {
runCatching {
innerRoom.canUserSendMessage(userId.value, type.map())
}
}
override suspend fun canUserTriggerRoomNotification(userId: UserId): Result<Boolean> {
return runCatching {
override suspend fun canUserTriggerRoomNotification(userId: UserId): Result<Boolean> = withContext(roomDispatcher) {
runCatching {
innerRoom.canUserTriggerRoomNotification(userId.value)
}
}
override suspend fun canUserPinUnpin(userId: UserId): Result<Boolean> {
return runCatching {
override suspend fun canUserPinUnpin(userId: UserId): Result<Boolean> = withContext(roomDispatcher) {
runCatching {
innerRoom.canUserPinUnpin(userId.value)
}
}
@ -676,8 +676,10 @@ class RustMatrixRoom(
return liveTimeline.sendVoiceMessage(file, audioInfo, waveform, progressCallback)
}
override suspend fun typingNotice(isTyping: Boolean) = runCatching {
innerRoom.typingNotice(isTyping)
override suspend fun typingNotice(isTyping: Boolean) = withContext(roomDispatcher) {
runCatching {
innerRoom.typingNotice(isTyping)
}
}
override suspend fun generateWidgetWebViewUrl(
@ -685,32 +687,42 @@ class RustMatrixRoom(
clientId: String,
languageTag: String?,
theme: String?,
) = runCatching {
widgetSettings.generateWidgetWebViewUrl(innerRoom, clientId, languageTag, theme)
) = withContext(roomDispatcher) {
runCatching {
widgetSettings.generateWidgetWebViewUrl(innerRoom, clientId, languageTag, theme)
}
}
override fun getWidgetDriver(widgetSettings: MatrixWidgetSettings): Result<MatrixWidgetDriver> = runCatching {
RustWidgetDriver(
widgetSettings = widgetSettings,
room = innerRoom,
widgetCapabilitiesProvider = object : WidgetCapabilitiesProvider {
override fun acquireCapabilities(capabilities: WidgetCapabilities): WidgetCapabilities {
return getElementCallRequiredPermissions(sessionId.value, deviceId.value)
}
},
)
override fun getWidgetDriver(widgetSettings: MatrixWidgetSettings): Result<MatrixWidgetDriver> {
return runCatching {
RustWidgetDriver(
widgetSettings = widgetSettings,
room = innerRoom,
widgetCapabilitiesProvider = object : WidgetCapabilitiesProvider {
override fun acquireCapabilities(capabilities: WidgetCapabilities): WidgetCapabilities {
return getElementCallRequiredPermissions(sessionId.value, deviceId.value)
}
},
)
}
}
override suspend fun getPermalink(): Result<String> = runCatching {
innerRoom.matrixToPermalink()
override suspend fun getPermalink(): Result<String> = withContext(roomDispatcher) {
runCatching {
innerRoom.matrixToPermalink()
}
}
override suspend fun getPermalinkFor(eventId: EventId): Result<String> = runCatching {
innerRoom.matrixToEventPermalink(eventId.value)
override suspend fun getPermalinkFor(eventId: EventId): Result<String> = withContext(roomDispatcher) {
runCatching {
innerRoom.matrixToEventPermalink(eventId.value)
}
}
override suspend fun sendCallNotificationIfNeeded(): Result<Unit> = runCatching {
innerRoom.sendCallNotificationIfNeeded()
override suspend fun sendCallNotificationIfNeeded(): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.sendCallNotificationIfNeeded()
}
}
override suspend fun setSendQueueEnabled(enabled: Boolean) {
@ -722,35 +734,45 @@ class RustMatrixRoom(
}
}
override suspend fun saveComposerDraft(composerDraft: ComposerDraft): Result<Unit> = runCatching {
Timber.d("saveComposerDraft: $composerDraft into $roomId")
innerRoom.saveComposerDraft(composerDraft.into())
override suspend fun saveComposerDraft(composerDraft: ComposerDraft): Result<Unit> = withContext(roomDispatcher) {
runCatching {
Timber.d("saveComposerDraft: $composerDraft into $roomId")
innerRoom.saveComposerDraft(composerDraft.into())
}
}
override suspend fun loadComposerDraft(): Result<ComposerDraft?> = runCatching {
Timber.d("loadComposerDraft for $roomId")
innerRoom.loadComposerDraft()?.into()
override suspend fun loadComposerDraft(): Result<ComposerDraft?> = withContext(roomDispatcher) {
runCatching {
Timber.d("loadComposerDraft for $roomId")
innerRoom.loadComposerDraft()?.into()
}
}
override suspend fun clearComposerDraft(): Result<Unit> = runCatching {
Timber.d("clearComposerDraft for $roomId")
innerRoom.clearComposerDraft()
override suspend fun clearComposerDraft(): Result<Unit> = withContext(roomDispatcher) {
runCatching {
Timber.d("clearComposerDraft for $roomId")
innerRoom.clearComposerDraft()
}
}
override suspend fun ignoreDeviceTrustAndResend(devices: Map<UserId, List<DeviceId>>, sendHandle: SendHandle) = runCatching {
innerRoom.ignoreDeviceTrustAndResend(
devices = devices.entries.associate { entry ->
entry.key.value to entry.value.map { it.value }
},
sendHandle = (sendHandle as RustSendHandle).inner,
)
override suspend fun ignoreDeviceTrustAndResend(devices: Map<UserId, List<DeviceId>>, sendHandle: SendHandle) = withContext(roomDispatcher) {
runCatching {
innerRoom.ignoreDeviceTrustAndResend(
devices = devices.entries.associate { entry ->
entry.key.value to entry.value.map { it.value }
},
sendHandle = (sendHandle as RustSendHandle).inner,
)
}
}
override suspend fun withdrawVerificationAndResend(userIds: List<UserId>, sendHandle: SendHandle) = runCatching {
innerRoom.withdrawVerificationAndResend(
userIds = userIds.map { it.value },
sendHandle = (sendHandle as RustSendHandle).inner,
)
override suspend fun withdrawVerificationAndResend(userIds: List<UserId>, sendHandle: SendHandle) = withContext(roomDispatcher) {
runCatching {
innerRoom.withdrawVerificationAndResend(
userIds = userIds.map { it.value },
sendHandle = (sendHandle as RustSendHandle).inner,
)
}
}
private fun createTimeline(

View file

@ -9,6 +9,7 @@ package io.element.android.libraries.matrix.impl.sync
import io.element.android.libraries.matrix.api.sync.SyncService
import io.element.android.libraries.matrix.api.sync.SyncState
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.flow.SharingStarted
@ -24,31 +25,36 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.matrix.rustcomponents.sdk.SyncService as InnerSyncService
class RustSyncService(
private val innerSyncService: InnerSyncService,
private val inner: InnerSyncService,
private val dispatcher: CoroutineDispatcher,
sessionCoroutineScope: CoroutineScope
) : SyncService {
private val isServiceReady = AtomicBoolean(true)
override suspend fun startSync() = runCatching {
if (!isServiceReady.get()) {
Timber.d("Can't start sync: service is not ready")
return@runCatching
override suspend fun startSync() = withContext(dispatcher) {
runCatching {
if (!isServiceReady.get()) {
Timber.d("Can't start sync: service is not ready")
return@runCatching
}
Timber.i("Start sync")
inner.start()
}.onFailure {
Timber.d("Start sync failed: $it")
}
Timber.i("Start sync")
innerSyncService.start()
}.onFailure {
Timber.d("Start sync failed: $it")
}
override suspend fun stopSync() = runCatching {
if (!isServiceReady.get()) {
Timber.d("Can't stop sync: service is not ready")
return@runCatching
override suspend fun stopSync() = withContext(dispatcher) {
runCatching {
if (!isServiceReady.get()) {
Timber.d("Can't stop sync: service is not ready")
return@runCatching
}
Timber.i("Stop sync")
inner.stop()
}.onFailure {
Timber.d("Stop sync failed: $it")
}
Timber.i("Stop sync")
innerSyncService.stop()
}.onFailure {
Timber.d("Stop sync failed: $it")
}
suspend fun destroy() = withContext(NonCancellable) {
@ -59,7 +65,7 @@ class RustSyncService(
}
override val syncState: StateFlow<SyncState> =
innerSyncService.stateFlow()
inner.stateFlow()
.map(SyncServiceState::toSyncState)
.onEach { state ->
Timber.i("Sync state=$state")

View file

@ -158,8 +158,8 @@ class RustTimeline(
override val membershipChangeEventReceived: Flow<Unit> = timelineDiffProcessor.membershipChangeEventReceived
override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result<Unit> {
return runCatching {
override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result<Unit> = withContext(dispatcher) {
runCatching {
inner.sendReadReceipt(receiptType.toRustReceiptType(), eventId.value)
}
}
@ -590,8 +590,8 @@ class RustTimeline(
}
}
private suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
return runCatching {
private suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> = withContext(dispatcher) {
runCatching {
inner.fetchDetailsForEvent(eventId.value)
}
}