RoomDirectory : continue improving interactions
This commit is contained in:
parent
153e88dade
commit
b900818001
14 changed files with 171 additions and 86 deletions
|
|
@ -76,6 +76,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
|
|||
import kotlinx.coroutines.TimeoutCancellationException
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
|
|
@ -158,7 +159,6 @@ class RustMatrixClient(
|
|||
|
||||
private val roomDirectoryService = RustRoomDirectoryService(
|
||||
client = client,
|
||||
sessionCoroutineScope = sessionCoroutineScope,
|
||||
sessionDispatcher = sessionDispatcher,
|
||||
)
|
||||
|
||||
|
|
@ -441,7 +441,7 @@ class RustMatrixClient(
|
|||
runCatching { client.removeAvatar() }
|
||||
}
|
||||
|
||||
override suspend fun joinRoom(roomId: RoomId): Result<RoomId> = withContext(sessionDispatcher) {
|
||||
override suspend fun joinRoom(roomId: RoomId): Result<RoomId> = withContext(sessionDispatcher) {
|
||||
runCatching {
|
||||
client.joinRoomById(roomId.value).destroy()
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -19,64 +19,79 @@ package io.element.android.libraries.matrix.impl.roomdirectory
|
|||
import io.element.android.libraries.matrix.api.roomdirectory.RoomDescription
|
||||
import io.element.android.libraries.matrix.api.roomdirectory.RoomDirectoryList
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.flowOn
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
import org.matrix.rustcomponents.sdk.RoomDirectorySearch as InnerRoomDirectorySearch
|
||||
import org.matrix.rustcomponents.sdk.RoomDirectorySearch
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
class RustRoomDirectoryList(
|
||||
private val inner: InnerRoomDirectorySearch,
|
||||
sessionCoroutineScope: CoroutineScope,
|
||||
sessionDispatcher: CoroutineDispatcher,
|
||||
private val inner: RoomDirectorySearch,
|
||||
coroutineScope: CoroutineScope,
|
||||
private val coroutineContext: CoroutineContext,
|
||||
) : RoomDirectoryList {
|
||||
|
||||
private val _items = MutableSharedFlow<List<RoomDescription>>(replay = 1)
|
||||
private val processor = RoomDirectorySearchProcessor(_items, sessionDispatcher, RoomDescriptionMapper())
|
||||
private val hasMoreToLoad = MutableStateFlow(true)
|
||||
private val items = MutableSharedFlow<List<RoomDescription>>(replay = 1)
|
||||
private val processor = RoomDirectorySearchProcessor(items, coroutineContext, RoomDescriptionMapper())
|
||||
|
||||
init {
|
||||
sessionCoroutineScope.launch(sessionDispatcher) {
|
||||
inner
|
||||
.resultsFlow()
|
||||
.onEach { updates ->
|
||||
processor.postUpdates(updates)
|
||||
}
|
||||
.launchIn(this)
|
||||
}
|
||||
launchIn(coroutineScope)
|
||||
}
|
||||
|
||||
private fun launchIn(coroutineScope: CoroutineScope) {
|
||||
inner
|
||||
.resultsFlow()
|
||||
.onEach { updates ->
|
||||
processor.postUpdates(updates)
|
||||
}
|
||||
.flowOn(coroutineContext)
|
||||
.launchIn(coroutineScope)
|
||||
}
|
||||
|
||||
override suspend fun filter(filter: String?, batchSize: Int): Result<Unit> {
|
||||
return try {
|
||||
return execute {
|
||||
inner.search(filter = filter, batchSize = batchSize.toUInt())
|
||||
Result.success(Unit)
|
||||
} catch (e: CancellationException) {
|
||||
throw e
|
||||
} catch (e: Exception) {
|
||||
Result.failure(e)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun loadMore(): Result<Unit> {
|
||||
return try {
|
||||
return execute {
|
||||
inner.nextPage()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun execute(action: suspend () -> Unit): Result<Unit> {
|
||||
return try {
|
||||
// We always assume there is more to load until we know there isn't.
|
||||
// As accessing hasMoreToLoad is otherwise blocked by the current action.
|
||||
hasMoreToLoad.value = true
|
||||
action()
|
||||
Result.success(Unit)
|
||||
} catch (e: CancellationException) {
|
||||
throw e
|
||||
} catch (e: Exception) {
|
||||
Result.failure(e)
|
||||
} finally {
|
||||
hasMoreToLoad.value = hasMoreToLoad()
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun hasMoreToLoad(): Boolean {
|
||||
private suspend fun hasMoreToLoad(): Boolean {
|
||||
return !inner.isAtLastPage()
|
||||
}
|
||||
|
||||
@OptIn(FlowPreview::class)
|
||||
override val items: Flow<List<RoomDescription>> = _items.debounce(200.milliseconds)
|
||||
override val state: Flow<RoomDirectoryList.State> =
|
||||
combine(hasMoreToLoad, items) { hasMoreToLoad, items ->
|
||||
RoomDirectoryList.State(
|
||||
hasMoreToLoad = hasMoreToLoad,
|
||||
items = items
|
||||
)
|
||||
}
|
||||
.flowOn(coroutineContext)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,12 +24,10 @@ import org.matrix.rustcomponents.sdk.Client
|
|||
|
||||
class RustRoomDirectoryService(
|
||||
private val client: Client,
|
||||
private val sessionCoroutineScope: CoroutineScope,
|
||||
private val sessionDispatcher: CoroutineDispatcher,
|
||||
) : RoomDirectoryService {
|
||||
|
||||
override fun createRoomDirectoryList(): RoomDirectoryList {
|
||||
return RustRoomDirectoryList(client.roomDirectorySearch(), sessionCoroutineScope, sessionDispatcher)
|
||||
override fun createRoomDirectoryList(scope: CoroutineScope): RoomDirectoryList {
|
||||
return RustRoomDirectoryList(client.roomDirectorySearch(), scope, sessionDispatcher)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue