From 0a59e974ae214afdf0178dfd8dbe3f53d1fb829a Mon Sep 17 00:00:00 2001 From: ganfra Date: Fri, 21 Jul 2023 10:24:57 +0200 Subject: [PATCH] Deadlock: makes sure timelineListener TaskHandle.cancel is called (and memory is released correctly) --- .../matrix/impl/room/RustMatrixRoom.kt | 8 +++--- .../impl/timeline/RoomTimelineExtensions.kt | 25 ++++++++++++++++--- .../impl/timeline/RustMatrixTimeline.kt | 9 ++++--- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt index 16434aa3c8..b185c34205 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt @@ -122,10 +122,12 @@ class RustMatrixRoom( innerRoom.timelineDiffFlow { initialList -> _timeline.postItems(initialList) }.onEach { diff -> - if (diff.eventOrigin() == EventItemOrigin.SYNC) { - _syncUpdateFlow.value = systemClock.epochMillis() + diff.use { + if (diff.eventOrigin() == EventItemOrigin.SYNC) { + _syncUpdateFlow.value = systemClock.epochMillis() + } + _timeline.postDiff(diff) } - _timeline.postDiff(diff) }.launchIn(this) innerRoom.backPaginationStatusFlow() diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt index d6febb32dc..29b75a1dca 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt @@ -16,28 +16,45 @@ package io.element.android.libraries.matrix.impl.timeline +import io.element.android.libraries.matrix.impl.util.destroyAll import io.element.android.libraries.matrix.impl.util.mxCallbackFlow import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.callbackFlow import org.matrix.rustcomponents.sdk.BackPaginationStatus import org.matrix.rustcomponents.sdk.BackPaginationStatusListener import org.matrix.rustcomponents.sdk.Room +import org.matrix.rustcomponents.sdk.RoomTimelineListenerResult import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem import org.matrix.rustcomponents.sdk.TimelineListener +import timber.log.Timber internal fun Room.timelineDiffFlow(onInitialList: suspend (List) -> Unit): Flow = - mxCallbackFlow { + callbackFlow { + val roomId = id() + Timber.d("Open timelineDiffFlow for room $roomId") val listener = object : TimelineListener { override fun onUpdate(diff: TimelineDiff) { trySendBlocking(diff) } } - val result = addTimelineListener(listener) - onInitialList(result.items) - result.itemsStream + var result: RoomTimelineListenerResult? = null + try { + result = addTimelineListener(listener) + onInitialList(result.items) + } catch (exception: Exception) { + Timber.d(exception, "Catch failure in timelineDiffFlow of room $roomId") + } + awaitClose { + Timber.d("Close timelineDiffFlow for room $roomId") + result?.itemsStream?.cancel() + result?.itemsStream?.destroy() + result?.items?.destroyAll() + } }.buffer(Channel.UNLIMITED) internal fun Room.backPaginationStatusFlow(): Flow = diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt index e213fb623c..5e7f4a2282 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt @@ -26,12 +26,14 @@ import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessage import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper -import kotlinx.coroutines.CompletableDeferred import io.element.android.libraries.matrix.impl.timeline.postprocessor.TimelineEncryptedHistoryPostProcessor +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow @@ -46,8 +48,8 @@ import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem import timber.log.Timber -import java.util.concurrent.atomic.AtomicBoolean import java.util.Date +import java.util.concurrent.atomic.AtomicBoolean private const val INITIAL_MAX_SIZE = 50 @@ -99,9 +101,10 @@ class RustMatrixTimeline( encryptedHistoryPostProcessor.process(items) } - internal suspend fun postItems(items: List) { + internal suspend fun postItems(items: List) = coroutineScope { // Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap. items.chunked(INITIAL_MAX_SIZE).reversed().forEach { + ensureActive() timelineDiffProcessor.postItems(it) } isInit.set(true)