feature(spaces) : introduce SpaceRoomList matrix api
This commit is contained in:
parent
7e0931c299
commit
aadd8b45e2
15 changed files with 277 additions and 115 deletions
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Copyright 2025 New Vector Ltd.
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
* Please see LICENSE files in the repository root for full details.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.spaces
|
||||
|
||||
import io.element.android.libraries.core.extensions.runCatchingExceptions
|
||||
import io.element.android.libraries.matrix.api.spaces.SpaceRoom
|
||||
import io.element.android.libraries.matrix.api.spaces.SpaceRoomList
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import uniffi.matrix_sdk_ui.SpaceRoomListPaginationState
|
||||
import org.matrix.rustcomponents.sdk.SpaceRoomList as InnerSpaceRoomList
|
||||
|
||||
class RustSpaceRoomList(
|
||||
private val inner: InnerSpaceRoomList,
|
||||
sessionCoroutineScope: CoroutineScope,
|
||||
spaceRoomMapper: SpaceRoomMapper,
|
||||
) : SpaceRoomList {
|
||||
override val spaceRoomsFlow = MutableSharedFlow<List<SpaceRoom>>(replay = 1, extraBufferCapacity = Int.MAX_VALUE)
|
||||
override val paginationStatusFlow = MutableStateFlow(inner.paginationState().into())
|
||||
private val spaceListUpdateProcessor = SpaceListUpdateProcessor(spaceRoomsFlow, spaceRoomMapper)
|
||||
|
||||
init {
|
||||
inner.paginationStateFlow()
|
||||
.onEach { paginationStatus ->
|
||||
paginationStatusFlow.emit(paginationStatus.into())
|
||||
}
|
||||
.launchIn(sessionCoroutineScope)
|
||||
|
||||
inner.spaceListUpdateFlow()
|
||||
.onEach { updates ->
|
||||
spaceListUpdateProcessor.postUpdates(updates)
|
||||
}
|
||||
.launchIn(sessionCoroutineScope)
|
||||
}
|
||||
|
||||
override suspend fun paginate(): Result<Unit> {
|
||||
return runCatchingExceptions {
|
||||
inner.paginate()
|
||||
}
|
||||
}
|
||||
|
||||
private fun SpaceRoomListPaginationState.into(): SpaceRoomList.PaginationStatus {
|
||||
return when (this) {
|
||||
is SpaceRoomListPaginationState.Idle -> SpaceRoomList.PaginationStatus.Idle(hasMoreToLoad = !endReached)
|
||||
SpaceRoomListPaginationState.Loading -> SpaceRoomList.PaginationStatus.Loading
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -8,7 +8,9 @@
|
|||
package io.element.android.libraries.matrix.impl.spaces
|
||||
|
||||
import io.element.android.libraries.core.extensions.runCatchingExceptions
|
||||
import io.element.android.libraries.matrix.api.core.SpaceId
|
||||
import io.element.android.libraries.matrix.api.spaces.SpaceRoom
|
||||
import io.element.android.libraries.matrix.api.spaces.SpaceRoomList
|
||||
import io.element.android.libraries.matrix.api.spaces.SpaceService
|
||||
import io.element.android.libraries.matrix.impl.util.cancelAndDestroy
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
|
|
@ -21,11 +23,8 @@ import kotlinx.coroutines.flow.MutableSharedFlow
|
|||
import kotlinx.coroutines.flow.buffer
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.SpaceListUpdate
|
||||
import org.matrix.rustcomponents.sdk.SpaceServiceInterface
|
||||
|
|
@ -38,104 +37,49 @@ class RustSpaceService(
|
|||
private val sessionCoroutineScope: CoroutineScope,
|
||||
private val sessionDispatcher: CoroutineDispatcher,
|
||||
) : SpaceService {
|
||||
private val mapper = SpaceRoomMapper()
|
||||
private val mutex = Mutex()
|
||||
|
||||
override val spaceRooms = MutableSharedFlow<List<SpaceRoom>>(replay = 1, extraBufferCapacity = 1)
|
||||
private val spaceRoomMapper = SpaceRoomMapper()
|
||||
override val spaceRoomsFlow = MutableSharedFlow<List<SpaceRoom>>(replay = 1, extraBufferCapacity = 1)
|
||||
private val spaceListUpdateProcessor = SpaceListUpdateProcessor(spaceRoomsFlow, spaceRoomMapper)
|
||||
|
||||
override suspend fun joinedSpaces(): Result<List<SpaceRoom>> = withContext(sessionDispatcher) {
|
||||
runCatchingExceptions {
|
||||
innerSpaceService.joinedSpaces()
|
||||
.map {
|
||||
it.let(mapper::map)
|
||||
it.let(spaceRoomMapper::map)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// override suspend fun spaceRoomList(spaceId: SpaceId): Result<List<SpaceRoom>> = withContext(sessionDispatcher) {
|
||||
// runCatchingExceptions {
|
||||
// innerSpaceService.spaceRoomList(spaceId.value)
|
||||
// }
|
||||
// }
|
||||
override suspend fun spaceRoomList(spaceId: SpaceId): SpaceRoomList {
|
||||
val innerSpaceRoomList = innerSpaceService.spaceRoomList(spaceId.value)
|
||||
return RustSpaceRoomList(
|
||||
inner = innerSpaceRoomList,
|
||||
sessionCoroutineScope = sessionCoroutineScope,
|
||||
spaceRoomMapper = spaceRoomMapper
|
||||
)
|
||||
}
|
||||
|
||||
init {
|
||||
innerSpaceService
|
||||
.spaceDiffFlow()
|
||||
.onEach {
|
||||
handeUpdate(it)
|
||||
.spaceListUpdate()
|
||||
.onEach { updates ->
|
||||
spaceListUpdateProcessor.postUpdates(updates)
|
||||
}
|
||||
.launchIn(sessionCoroutineScope)
|
||||
}
|
||||
|
||||
private suspend fun handeUpdate(spaceListUpdates: List<SpaceListUpdate>) {
|
||||
mutex.withLock {
|
||||
val current = if (spaceRooms.replayCache.isNotEmpty()) {
|
||||
spaceRooms.first().toMutableList()
|
||||
} else {
|
||||
mutableListOf()
|
||||
}
|
||||
spaceListUpdates.forEach { update ->
|
||||
current.applyUpdate(update)
|
||||
}
|
||||
spaceRooms.emit(current)
|
||||
}
|
||||
}
|
||||
|
||||
private fun MutableList<SpaceRoom>.applyUpdate(update: SpaceListUpdate) {
|
||||
when (update) {
|
||||
is SpaceListUpdate.Append -> {
|
||||
val newSpaces = update.values.map(mapper::map)
|
||||
addAll(newSpaces)
|
||||
}
|
||||
SpaceListUpdate.Clear -> clear()
|
||||
is SpaceListUpdate.Insert -> {
|
||||
val newSpace = mapper.map(update.value)
|
||||
add(update.index.toInt(), newSpace)
|
||||
}
|
||||
SpaceListUpdate.PopBack -> {
|
||||
removeAt(lastIndex)
|
||||
}
|
||||
SpaceListUpdate.PopFront -> {
|
||||
removeAt(0)
|
||||
}
|
||||
is SpaceListUpdate.PushBack -> {
|
||||
val newSpace = mapper.map(update.value)
|
||||
add(newSpace)
|
||||
}
|
||||
is SpaceListUpdate.PushFront -> {
|
||||
val newSpace = mapper.map(update.value)
|
||||
add(0, newSpace)
|
||||
}
|
||||
is SpaceListUpdate.Remove -> {
|
||||
removeAt(update.index.toInt())
|
||||
}
|
||||
is SpaceListUpdate.Reset -> {
|
||||
clear()
|
||||
val newSpaces = update.values.map(mapper::map)
|
||||
addAll(newSpaces)
|
||||
}
|
||||
is SpaceListUpdate.Set -> {
|
||||
val newSpace = mapper.map(update.value)
|
||||
this[update.index.toInt()] = newSpace
|
||||
}
|
||||
is SpaceListUpdate.Truncate -> {
|
||||
subList(update.length.toInt(), size).clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal fun SpaceServiceInterface.spaceDiffFlow(): Flow<List<SpaceListUpdate>> =
|
||||
internal fun SpaceServiceInterface.spaceListUpdate(): Flow<List<SpaceListUpdate>> =
|
||||
callbackFlow {
|
||||
val listener = object : SpaceServiceJoinedSpacesListener {
|
||||
override fun onUpdate(roomUpdates: List<SpaceListUpdate>) {
|
||||
trySendBlocking(roomUpdates)
|
||||
}
|
||||
}
|
||||
Timber.d("Open spaceDiffFlow for SpaceServiceInterface ${this@spaceDiffFlow}")
|
||||
Timber.d("Open spaceDiffFlow for SpaceServiceInterface ${this@spaceListUpdate}")
|
||||
val taskHandle = subscribeToJoinedSpaces(listener)
|
||||
awaitClose {
|
||||
Timber.d("Close spaceDiffFlow for SpaceServiceInterface ${this@spaceDiffFlow}")
|
||||
Timber.d("Close spaceDiffFlow for SpaceServiceInterface ${this@spaceListUpdate}")
|
||||
taskHandle.cancelAndDestroy()
|
||||
}
|
||||
}.catch {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Copyright 2023, 2024 New Vector Ltd.
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
* Please see LICENSE files in the repository root for full details.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.spaces
|
||||
|
||||
import io.element.android.libraries.matrix.api.spaces.SpaceRoom
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import org.matrix.rustcomponents.sdk.SpaceListUpdate
|
||||
import timber.log.Timber
|
||||
|
||||
internal class SpaceListUpdateProcessor(
|
||||
private val spaceRoomsFlow: MutableSharedFlow<List<SpaceRoom>>,
|
||||
private val mapper: SpaceRoomMapper,
|
||||
) {
|
||||
private val mutex = Mutex()
|
||||
|
||||
suspend fun postUpdates(updates: List<SpaceListUpdate>) {
|
||||
Timber.v("Update space rooms from postUpdates (with ${updates.size} items) on ${Thread.currentThread()}")
|
||||
updateSpaceRooms {
|
||||
updates.forEach { update -> applyUpdate(update) }
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun updateSpaceRooms(block: MutableList<SpaceRoom>.() -> Unit) =
|
||||
mutex.withLock {
|
||||
val spaceRooms = if (spaceRoomsFlow.replayCache.isNotEmpty()) {
|
||||
spaceRoomsFlow.first().toMutableList()
|
||||
} else {
|
||||
mutableListOf()
|
||||
}
|
||||
block(spaceRooms)
|
||||
spaceRoomsFlow.tryEmit(spaceRooms)
|
||||
}
|
||||
|
||||
private fun MutableList<SpaceRoom>.applyUpdate(update: SpaceListUpdate) {
|
||||
when (update) {
|
||||
is SpaceListUpdate.Append -> {
|
||||
val newSpaces = update.values.map { it ->
|
||||
it.let(mapper::map)
|
||||
}
|
||||
addAll(newSpaces)
|
||||
}
|
||||
SpaceListUpdate.Clear -> clear()
|
||||
is SpaceListUpdate.Insert -> {
|
||||
val newSpace = mapper.map(update.value)
|
||||
add(update.index.toInt(), newSpace)
|
||||
}
|
||||
SpaceListUpdate.PopBack -> {
|
||||
removeAt(lastIndex)
|
||||
}
|
||||
SpaceListUpdate.PopFront -> {
|
||||
removeAt(0)
|
||||
}
|
||||
is SpaceListUpdate.PushBack -> {
|
||||
val newSpace = mapper.map(update.value)
|
||||
add(newSpace)
|
||||
}
|
||||
is SpaceListUpdate.PushFront -> {
|
||||
val newSpace = mapper.map(update.value)
|
||||
add(0, newSpace)
|
||||
}
|
||||
is SpaceListUpdate.Remove -> {
|
||||
removeAt(update.index.toInt())
|
||||
}
|
||||
is SpaceListUpdate.Reset -> {
|
||||
clear()
|
||||
val newSpaces = update.values.map(mapper::map)
|
||||
addAll(newSpaces)
|
||||
}
|
||||
is SpaceListUpdate.Set -> {
|
||||
val newSpace = mapper.map(update.value)
|
||||
this[update.index.toInt()] = newSpace
|
||||
}
|
||||
is SpaceListUpdate.Truncate -> {
|
||||
subList(update.length.toInt(), size).clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright 2025 New Vector Ltd.
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
* Please see LICENSE files in the repository root for full details.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.spaces
|
||||
|
||||
import io.element.android.libraries.matrix.impl.util.cancelAndDestroy
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.buffer
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import org.matrix.rustcomponents.sdk.SpaceListUpdate
|
||||
import org.matrix.rustcomponents.sdk.SpaceRoomListEntriesListener
|
||||
import org.matrix.rustcomponents.sdk.SpaceRoomListInterface
|
||||
import org.matrix.rustcomponents.sdk.SpaceRoomListPaginationStateListener
|
||||
import timber.log.Timber
|
||||
import uniffi.matrix_sdk_ui.SpaceRoomListPaginationState
|
||||
|
||||
internal fun SpaceRoomListInterface.paginationStateFlow(): Flow<SpaceRoomListPaginationState> = callbackFlow {
|
||||
val listener = object : SpaceRoomListPaginationStateListener {
|
||||
override fun onUpdate(paginationState: SpaceRoomListPaginationState) {
|
||||
trySend(paginationState)
|
||||
}
|
||||
}
|
||||
val result = subscribeToPaginationStateUpdates(listener)
|
||||
awaitClose {
|
||||
result.cancelAndDestroy()
|
||||
}
|
||||
}.catch {
|
||||
Timber.d(it, "paginationStateFlow() failed")
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
||||
internal fun SpaceRoomListInterface.spaceListUpdateFlow(): Flow<List<SpaceListUpdate>> =
|
||||
callbackFlow {
|
||||
val listener = object : SpaceRoomListEntriesListener {
|
||||
override fun onUpdate(rooms: List<SpaceListUpdate>) {
|
||||
trySendBlocking(rooms)
|
||||
}
|
||||
}
|
||||
Timber.d("Open spaceListUpdateFlow for SpaceRoomListInterface ${this@spaceListUpdateFlow}")
|
||||
val taskHandle = subscribeToRoomUpdate(listener)
|
||||
awaitClose {
|
||||
Timber.d("Close spaceListUpdateFlow for SpaceRoomListInterface ${this@spaceListUpdateFlow}")
|
||||
taskHandle.cancelAndDestroy()
|
||||
}
|
||||
}.catch {
|
||||
Timber.d(it, "spaceListUpdateFlow() failed")
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
|
@ -9,6 +9,7 @@ package io.element.android.libraries.matrix.impl.spaces
|
|||
|
||||
import io.element.android.libraries.core.bool.orFalse
|
||||
import io.element.android.libraries.matrix.api.core.RoomAlias
|
||||
import io.element.android.libraries.matrix.api.core.RoomId
|
||||
import io.element.android.libraries.matrix.api.core.SpaceId
|
||||
import io.element.android.libraries.matrix.api.spaces.SpaceRoom
|
||||
import io.element.android.libraries.matrix.impl.room.join.map
|
||||
|
|
@ -26,7 +27,7 @@ class SpaceRoomMapper {
|
|||
joinRule = spaceRoom.joinRule?.map(),
|
||||
name = spaceRoom.name,
|
||||
numJoinedMembers = spaceRoom.numJoinedMembers.toInt(),
|
||||
spaceId = spaceRoom.roomId.let(::SpaceId),
|
||||
roomId = RoomId(spaceRoom.roomId),
|
||||
roomType = spaceRoom.roomType.map(),
|
||||
state = spaceRoom.state?.map(),
|
||||
topic = spaceRoom.topic,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue