Deadlock: makes sure timelineListener TaskHandle.cancel is called (and memory is released correctly)
This commit is contained in:
parent
1583899261
commit
0a59e974ae
3 changed files with 32 additions and 10 deletions
|
|
@ -122,10 +122,12 @@ class RustMatrixRoom(
|
|||
innerRoom.timelineDiffFlow { initialList ->
|
||||
_timeline.postItems(initialList)
|
||||
}.onEach { diff ->
|
||||
if (diff.eventOrigin() == EventItemOrigin.SYNC) {
|
||||
_syncUpdateFlow.value = systemClock.epochMillis()
|
||||
diff.use {
|
||||
if (diff.eventOrigin() == EventItemOrigin.SYNC) {
|
||||
_syncUpdateFlow.value = systemClock.epochMillis()
|
||||
}
|
||||
_timeline.postDiff(diff)
|
||||
}
|
||||
_timeline.postDiff(diff)
|
||||
}.launchIn(this)
|
||||
|
||||
innerRoom.backPaginationStatusFlow()
|
||||
|
|
|
|||
|
|
@ -16,28 +16,45 @@
|
|||
|
||||
package io.element.android.libraries.matrix.impl.timeline
|
||||
|
||||
import io.element.android.libraries.matrix.impl.util.destroyAll
|
||||
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.buffer
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import org.matrix.rustcomponents.sdk.BackPaginationStatus
|
||||
import org.matrix.rustcomponents.sdk.BackPaginationStatusListener
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomTimelineListenerResult
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import org.matrix.rustcomponents.sdk.TimelineListener
|
||||
import timber.log.Timber
|
||||
|
||||
internal fun Room.timelineDiffFlow(onInitialList: suspend (List<TimelineItem>) -> Unit): Flow<TimelineDiff> =
|
||||
mxCallbackFlow {
|
||||
callbackFlow {
|
||||
val roomId = id()
|
||||
Timber.d("Open timelineDiffFlow for room $roomId")
|
||||
val listener = object : TimelineListener {
|
||||
override fun onUpdate(diff: TimelineDiff) {
|
||||
trySendBlocking(diff)
|
||||
}
|
||||
}
|
||||
val result = addTimelineListener(listener)
|
||||
onInitialList(result.items)
|
||||
result.itemsStream
|
||||
var result: RoomTimelineListenerResult? = null
|
||||
try {
|
||||
result = addTimelineListener(listener)
|
||||
onInitialList(result.items)
|
||||
} catch (exception: Exception) {
|
||||
Timber.d(exception, "Catch failure in timelineDiffFlow of room $roomId")
|
||||
}
|
||||
awaitClose {
|
||||
Timber.d("Close timelineDiffFlow for room $roomId")
|
||||
result?.itemsStream?.cancel()
|
||||
result?.itemsStream?.destroy()
|
||||
result?.items?.destroyAll()
|
||||
}
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
||||
internal fun Room.backPaginationStatusFlow(): Flow<BackPaginationStatus> =
|
||||
|
|
|
|||
|
|
@ -26,12 +26,14 @@ import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessage
|
|||
import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import io.element.android.libraries.matrix.impl.timeline.postprocessor.TimelineEncryptedHistoryPostProcessor
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
|
|
@ -46,8 +48,8 @@ import org.matrix.rustcomponents.sdk.Room
|
|||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.Date
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
private const val INITIAL_MAX_SIZE = 50
|
||||
|
||||
|
|
@ -99,9 +101,10 @@ class RustMatrixTimeline(
|
|||
encryptedHistoryPostProcessor.process(items)
|
||||
}
|
||||
|
||||
internal suspend fun postItems(items: List<TimelineItem>) {
|
||||
internal suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
|
||||
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
|
||||
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
|
||||
ensureActive()
|
||||
timelineDiffProcessor.postItems(it)
|
||||
}
|
||||
isInit.set(true)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue