From 0e9af5f42a5bbfc5f57870f7976f1d8b4305e818 Mon Sep 17 00:00:00 2001 From: ganfra Date: Fri, 10 Apr 2026 20:45:18 +0200 Subject: [PATCH] Refactor live location shares to use callbackFlow --- .../impl/room/location/LiveLocationSharesFlow.kt | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/location/LiveLocationSharesFlow.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/location/LiveLocationSharesFlow.kt index efe2d0cd68..4f4ddac667 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/location/LiveLocationSharesFlow.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/location/LiveLocationSharesFlow.kt @@ -10,10 +10,12 @@ package io.element.android.libraries.matrix.impl.room.location import io.element.android.libraries.matrix.api.core.UserId import io.element.android.libraries.matrix.api.room.location.LastLocation import io.element.android.libraries.matrix.api.room.location.LiveLocationShare -import io.element.android.libraries.matrix.impl.util.mxCallbackFlow +import io.element.android.libraries.matrix.impl.util.cancelAndDestroy import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.callbackFlow import org.matrix.rustcomponents.sdk.LiveLocationShareListener import org.matrix.rustcomponents.sdk.LiveLocationShareUpdate import org.matrix.rustcomponents.sdk.RoomInterface @@ -38,9 +40,10 @@ fun RoomInterface.liveLocationSharesFlow(): Flow> { is LiveLocationShareUpdate.Truncate -> subList(update.length.toInt(), size).clear() } } - return mxCallbackFlow { + return callbackFlow { + val liveLocationShares = liveLocationShares() val shares: MutableList = ArrayList() - subscribeToLiveLocationShares(object : LiveLocationShareListener { + val taskHandle = liveLocationShares.subscribe(object : LiveLocationShareListener { override fun onUpdate(updates: List) { for (update in updates) { shares.applyUpdate(update) @@ -48,6 +51,10 @@ fun RoomInterface.liveLocationSharesFlow(): Flow> { trySend(shares) } }) + awaitClose { + taskHandle.cancelAndDestroy() + liveLocationShares.destroy() + } }.buffer(Channel.UNLIMITED) }