From bd5e1e6520281c2b3e48c67c74e7a8c19510cae0 Mon Sep 17 00:00:00 2001 From: ganfra Date: Fri, 28 Jul 2023 13:40:18 +0200 Subject: [PATCH 1/2] No crash when room is already destroyed... --- .../matrix/impl/room/RoomListExtensions.kt | 47 ++++++++++++------- .../matrix/impl/sync/SyncServiceExtension.kt | 5 +- .../impl/timeline/RoomTimelineExtensions.kt | 13 +++-- .../matrix/impl/util/CallbackFlow.kt | 7 ++- .../util/{TaskHandleBag.kt => TaskHandle.kt} | 8 +++- 5 files changed, 52 insertions(+), 28 deletions(-) rename libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/{TaskHandleBag.kt => TaskHandle.kt} (91%) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt index b74ab8968e..5d92bfde0b 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt @@ -16,6 +16,7 @@ package io.element.android.libraries.matrix.impl.room +import io.element.android.libraries.core.data.tryOrNull import io.element.android.libraries.matrix.impl.util.mxCallbackFlow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.trySendBlocking @@ -40,9 +41,15 @@ fun RoomList.loadingStateFlow(): Flow = trySendBlocking(state) } } - val result = loadingState(listener) - send(result.state) - result.stateStream + tryOrNull { + val result = loadingState(listener) + try { + send(result.state) + } catch (exception: Exception) { + Timber.d("loadingStateFlow() initialState failed.") + } + result.stateStream + } }.buffer(Channel.UNLIMITED) fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): Flow> = @@ -52,9 +59,27 @@ fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): trySendBlocking(roomEntriesUpdate) } } - val result = entries(listener) - onInitialList(result.entries) - result.entriesStream + tryOrNull { + val result = entries(listener) + try { + onInitialList(result.entries) + } catch (exception: Exception) { + Timber.d(exception, "entriesFlow() onInitialList failed.") + } + result.entriesStream + } + }.buffer(Channel.UNLIMITED) + +fun RoomListService.stateFlow(): Flow = + mxCallbackFlow { + val listener = object : RoomListServiceStateListener { + override fun onUpdate(state: RoomListServiceState) { + trySendBlocking(state) + } + } + tryOrNull { + state(listener) + } }.buffer(Channel.UNLIMITED) fun RoomListService.roomOrNull(roomId: String): RoomListItem? { @@ -65,13 +90,3 @@ fun RoomListService.roomOrNull(roomId: String): RoomListItem? { return null } } - -fun RoomListService.stateFlow(): Flow = - mxCallbackFlow { - val listener = object : RoomListServiceStateListener { - override fun onUpdate(state: RoomListServiceState) { - trySendBlocking(state) - } - } - state(listener) - }.buffer(Channel.UNLIMITED) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt index 36dabb71f3..a8e366ff7b 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt @@ -16,6 +16,7 @@ package io.element.android.libraries.matrix.impl.sync +import io.element.android.libraries.core.data.tryOrNull import io.element.android.libraries.matrix.impl.util.mxCallbackFlow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.trySendBlocking @@ -32,5 +33,7 @@ fun SyncService.stateFlow(): Flow = trySendBlocking(state) } } - state(listener) + tryOrNull { + state(listener) + } }.buffer(Channel.UNLIMITED) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt index 29b75a1dca..3c5bb26e30 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt @@ -16,6 +16,8 @@ package io.element.android.libraries.matrix.impl.timeline +import io.element.android.libraries.core.data.tryOrNull +import io.element.android.libraries.matrix.impl.util.cancelAndDestroy import io.element.android.libraries.matrix.impl.util.destroyAll import io.element.android.libraries.matrix.impl.util.mxCallbackFlow import kotlinx.coroutines.channels.Channel @@ -35,14 +37,14 @@ import timber.log.Timber internal fun Room.timelineDiffFlow(onInitialList: suspend (List) -> Unit): Flow = callbackFlow { - val roomId = id() - Timber.d("Open timelineDiffFlow for room $roomId") val listener = object : TimelineListener { override fun onUpdate(diff: TimelineDiff) { trySendBlocking(diff) } } var result: RoomTimelineListenerResult? = null + val roomId = tryOrNull { id() } + Timber.d("Open timelineDiffFlow for room $roomId") try { result = addTimelineListener(listener) onInitialList(result.items) @@ -51,8 +53,7 @@ internal fun Room.timelineDiffFlow(onInitialList: suspend (List) - } awaitClose { Timber.d("Close timelineDiffFlow for room $roomId") - result?.itemsStream?.cancel() - result?.itemsStream?.destroy() + result?.itemsStream?.cancelAndDestroy() result?.items?.destroyAll() } }.buffer(Channel.UNLIMITED) @@ -64,5 +65,7 @@ internal fun Room.backPaginationStatusFlow(): Flow = trySendBlocking(status) } } - subscribeToBackPaginationStatus(listener) + tryOrNull { + subscribeToBackPaginationStatus(listener) + } }.buffer(Channel.UNLIMITED) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt index a347973e89..283b5f7076 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt @@ -21,11 +21,10 @@ import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.callbackFlow import org.matrix.rustcomponents.sdk.TaskHandle -internal fun mxCallbackFlow(block: suspend ProducerScope.() -> TaskHandle) = +internal fun mxCallbackFlow(block: suspend ProducerScope.() -> TaskHandle?) = callbackFlow { - val token: TaskHandle = block(this) + val token: TaskHandle? = block(this) awaitClose { - token.cancel() - token.destroy() + token?.cancelAndDestroy() } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandleBag.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt similarity index 91% rename from libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandleBag.kt rename to libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt index 9a21645351..937e73e72e 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandleBag.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt @@ -28,9 +28,13 @@ class TaskHandleBag(private val tokens: MutableSet = CopyOnWriteArra fun dispose() { tokens.forEach { - it.cancel() - it.destroy() + it.cancelAndDestroy() } tokens.clear() } } + +fun TaskHandle.cancelAndDestroy() { + cancel() + destroy() +} From e9802ffea6f198ef44d4e7c2620bc4c4e3173cac Mon Sep 17 00:00:00 2001 From: ganfra Date: Mon, 31 Jul 2023 11:36:28 +0200 Subject: [PATCH 2/2] Small changes after reviews --- .../matrix/impl/room/RoomListExtensions.kt | 33 ++++++++++--------- .../impl/timeline/RoomTimelineExtensions.kt | 13 ++++---- .../matrix/impl/util/CallbackFlow.kt | 4 +-- .../libraries/matrix/impl/util/TaskHandle.kt | 30 ++++++++--------- 4 files changed, 41 insertions(+), 39 deletions(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt index 5d92bfde0b..8e7047aaa4 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.RoomList import org.matrix.rustcomponents.sdk.RoomListEntriesListener import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate @@ -41,15 +42,15 @@ fun RoomList.loadingStateFlow(): Flow = trySendBlocking(state) } } - tryOrNull { - val result = loadingState(listener) - try { - send(result.state) - } catch (exception: Exception) { - Timber.d("loadingStateFlow() initialState failed.") - } - result.stateStream + val result = loadingState(listener) + try { + send(result.state) + } catch (exception: Exception) { + Timber.d("loadingStateFlow() initialState failed.") } + result.stateStream + }.catch { + Timber.d(it, "loadingStateFlow() failed") }.buffer(Channel.UNLIMITED) fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): Flow> = @@ -59,15 +60,15 @@ fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): trySendBlocking(roomEntriesUpdate) } } - tryOrNull { - val result = entries(listener) - try { - onInitialList(result.entries) - } catch (exception: Exception) { - Timber.d(exception, "entriesFlow() onInitialList failed.") - } - result.entriesStream + val result = entries(listener) + try { + onInitialList(result.entries) + } catch (exception: Exception) { + Timber.d("entriesFlow() onInitialList failed.") } + result.entriesStream + }.catch { + Timber.d(it, "entriesFlow() failed") }.buffer(Channel.UNLIMITED) fun RoomListService.stateFlow(): Flow = diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt index 3c5bb26e30..9f8bccb1d0 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt @@ -26,10 +26,10 @@ import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.BackPaginationStatus import org.matrix.rustcomponents.sdk.BackPaginationStatusListener import org.matrix.rustcomponents.sdk.Room -import org.matrix.rustcomponents.sdk.RoomTimelineListenerResult import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem import org.matrix.rustcomponents.sdk.TimelineListener @@ -42,20 +42,21 @@ internal fun Room.timelineDiffFlow(onInitialList: suspend (List) - trySendBlocking(diff) } } - var result: RoomTimelineListenerResult? = null - val roomId = tryOrNull { id() } + val roomId = id() Timber.d("Open timelineDiffFlow for room $roomId") + val result = addTimelineListener(listener) try { - result = addTimelineListener(listener) onInitialList(result.items) } catch (exception: Exception) { Timber.d(exception, "Catch failure in timelineDiffFlow of room $roomId") } awaitClose { Timber.d("Close timelineDiffFlow for room $roomId") - result?.itemsStream?.cancelAndDestroy() - result?.items?.destroyAll() + result.itemsStream.cancelAndDestroy() + result.items.destroyAll() } + }.catch { + Timber.d(it, "timelineDiffFlow() failed") }.buffer(Channel.UNLIMITED) internal fun Room.backPaginationStatusFlow(): Flow = diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt index 283b5f7076..fbf393e587 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt @@ -23,8 +23,8 @@ import org.matrix.rustcomponents.sdk.TaskHandle internal fun mxCallbackFlow(block: suspend ProducerScope.() -> TaskHandle?) = callbackFlow { - val token: TaskHandle? = block(this) + val taskHandle: TaskHandle? = block(this) awaitClose { - token?.cancelAndDestroy() + taskHandle?.cancelAndDestroy() } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt index 937e73e72e..5842ba1546 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt @@ -19,22 +19,22 @@ package io.element.android.libraries.matrix.impl.util import org.matrix.rustcomponents.sdk.TaskHandle import java.util.concurrent.CopyOnWriteArraySet -class TaskHandleBag(private val tokens: MutableSet = CopyOnWriteArraySet()) : Set by tokens { - - operator fun plusAssign(taskHandle: TaskHandle?) { - if (taskHandle == null) return - tokens += taskHandle - } - - fun dispose() { - tokens.forEach { - it.cancelAndDestroy() - } - tokens.clear() - } -} - fun TaskHandle.cancelAndDestroy() { cancel() destroy() } + +class TaskHandleBag(private val taskHandles: MutableSet = CopyOnWriteArraySet()) : Set by taskHandles { + + operator fun plusAssign(taskHandle: TaskHandle?) { + if (taskHandle == null) return + taskHandles += taskHandle + } + + fun dispose() { + taskHandles.forEach { + it.cancelAndDestroy() + } + taskHandles.clear() + } +}