Merge branch 'develop' into feature/fga/room_list_api
This commit is contained in:
commit
7ee3c1bf42
114 changed files with 2437 additions and 170 deletions
|
|
@ -40,6 +40,7 @@ import io.element.android.libraries.matrix.impl.core.toProgressWatcher
|
|||
import io.element.android.libraries.matrix.impl.media.RustMediaLoader
|
||||
import io.element.android.libraries.matrix.impl.notification.RustNotificationService
|
||||
import io.element.android.libraries.matrix.impl.pushers.RustPushersService
|
||||
import io.element.android.libraries.matrix.impl.room.RoomContentForwarder
|
||||
import io.element.android.libraries.matrix.impl.room.RustMatrixRoom
|
||||
import io.element.android.libraries.matrix.impl.room.RustRoomSummaryDataSource
|
||||
import io.element.android.libraries.matrix.impl.room.roomOrNull
|
||||
|
|
@ -79,7 +80,7 @@ class RustMatrixClient constructor(
|
|||
|
||||
override val sessionId: UserId = UserId(client.userId())
|
||||
|
||||
private val roomListService = client.roomList()
|
||||
private val roomListService = client.roomListService()
|
||||
private val sessionCoroutineScope = childScopeOf(appCoroutineScope, dispatchers.main, "Session-${sessionId}")
|
||||
private val verificationService = RustSessionVerificationService()
|
||||
private val syncService = RustSyncService(roomListService, sessionCoroutineScope)
|
||||
|
|
@ -112,6 +113,8 @@ class RustMatrixClient constructor(
|
|||
|
||||
private val roomMembershipObserver = RoomMembershipObserver()
|
||||
|
||||
private val roomContentForwarder = RoomContentForwarder(roomListService)
|
||||
|
||||
init {
|
||||
client.setDelegate(clientDelegate)
|
||||
syncService.syncState
|
||||
|
|
@ -132,7 +135,8 @@ class RustMatrixClient constructor(
|
|||
innerRoom = fullRoom,
|
||||
sessionCoroutineScope = sessionCoroutineScope,
|
||||
coroutineDispatchers = dispatchers,
|
||||
systemClock = clock
|
||||
systemClock = clock,
|
||||
roomContentForwarder = roomContentForwarder,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ class RustNotificationService(
|
|||
eventId: EventId
|
||||
): Result<NotificationData?> {
|
||||
return runCatching {
|
||||
client.getNotificationItem(roomId.value, eventId.value).use(notificationMapper::map)
|
||||
client.getNotificationItem(roomId.value, eventId.value)?.use(notificationMapper::map)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.room
|
||||
|
||||
import io.element.android.libraries.core.coroutine.parallelMap
|
||||
import io.element.android.libraries.matrix.api.core.EventId
|
||||
import io.element.android.libraries.matrix.api.core.RoomId
|
||||
import io.element.android.libraries.matrix.api.room.ForwardEventException
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.withTimeout
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import org.matrix.rustcomponents.sdk.SlidingSync
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineListener
|
||||
import org.matrix.rustcomponents.sdk.genTransactionId
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
|
||||
/**
|
||||
* Helper to forward event contents from a room to a set of other rooms.
|
||||
* @param slidingSync the [SlidingSync] to fetch room instances to forward the event to
|
||||
*/
|
||||
class RoomContentForwarder(
|
||||
private val roomListService: RoomListService,
|
||||
) {
|
||||
|
||||
/**
|
||||
* Forwards the event with the given [eventId] from the [fromRoom] to the given [toRoomIds].
|
||||
* @param fromRoom the room to forward the event from
|
||||
* @param eventId the id of the event to forward
|
||||
* @param toRoomIds the ids of the rooms to forward the event to
|
||||
* @param timeoutMs the maximum time in milliseconds to wait for the event to be sent to a room
|
||||
*/
|
||||
suspend fun forward(
|
||||
fromRoom: Room,
|
||||
eventId: EventId,
|
||||
toRoomIds: List<RoomId>,
|
||||
timeoutMs: Long = 5000L
|
||||
) {
|
||||
val content = fromRoom.getTimelineEventContentByEventId(eventId.value)
|
||||
val targetSlidingSyncRooms = toRoomIds.mapNotNull { roomId -> roomListService.roomOrNull(roomId.value) }
|
||||
val targetRooms = targetSlidingSyncRooms.mapNotNull { slidingSyncRoom -> slidingSyncRoom.use { it.fullRoom() } }
|
||||
val failedForwardingTo = mutableSetOf<RoomId>()
|
||||
targetRooms.parallelMap { room ->
|
||||
room.use { targetRoom ->
|
||||
val result = runCatching {
|
||||
// Sending a message requires a registered timeline listener
|
||||
targetRoom.addTimelineListener(NoOpTimelineListener)
|
||||
withTimeout(timeoutMs.milliseconds) {
|
||||
targetRoom.send(content, genTransactionId())
|
||||
}
|
||||
}
|
||||
// After sending, we remove the timeline
|
||||
targetRoom.removeTimeline()
|
||||
result
|
||||
}.onFailure {
|
||||
failedForwardingTo.add(RoomId(room.id()))
|
||||
if (it is CancellationException) {
|
||||
throw it
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (failedForwardingTo.isNotEmpty()) {
|
||||
throw ForwardEventException(toRoomIds.toList())
|
||||
}
|
||||
}
|
||||
|
||||
private object NoOpTimelineListener : TimelineListener {
|
||||
override fun onUpdate(diff: TimelineDiff) = Unit
|
||||
}
|
||||
}
|
||||
|
|
@ -3,40 +3,31 @@ package io.element.android.libraries.matrix.impl.room
|
|||
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import org.matrix.rustcomponents.sdk.RoomList
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesListener
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntry
|
||||
import org.matrix.rustcomponents.sdk.RoomListInterface
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
import org.matrix.rustcomponents.sdk.RoomListState
|
||||
import org.matrix.rustcomponents.sdk.RoomListStateListener
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListLoadingState
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListStateObserver
|
||||
import org.matrix.rustcomponents.sdk.RoomListLoadingState
|
||||
import org.matrix.rustcomponents.sdk.RoomListLoadingStateListener
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import org.matrix.rustcomponents.sdk.RoomListServiceState
|
||||
import org.matrix.rustcomponents.sdk.RoomListServiceStateListener
|
||||
import timber.log.Timber
|
||||
|
||||
fun RoomListInterface.roomListStateFlow(): Flow<RoomListState> =
|
||||
fun RoomList.loadingStateFlow(): Flow<RoomListLoadingState> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : RoomListStateListener {
|
||||
override fun onUpdate(state: RoomListState) {
|
||||
val listener = object : RoomListLoadingStateListener {
|
||||
override fun onUpdate(state: RoomListLoadingState) {
|
||||
trySendBlocking(state)
|
||||
}
|
||||
}
|
||||
state(listener)
|
||||
val result = loadingState(listener)
|
||||
send(result.state)
|
||||
result.stateStream
|
||||
}
|
||||
|
||||
fun RoomListInterface.loadingStateFlow(): Flow<SlidingSyncListLoadingState> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : SlidingSyncListStateObserver {
|
||||
override fun didReceiveUpdate(newState: SlidingSyncListLoadingState) {
|
||||
trySendBlocking(newState)
|
||||
}
|
||||
}
|
||||
val result = entriesLoadingState(listener)
|
||||
send(result.entriesLoadingState)
|
||||
result.entriesLoadingStateStream
|
||||
}
|
||||
|
||||
fun RoomListInterface.roomListEntriesUpdateFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit): Flow<RoomListEntriesUpdate> =
|
||||
fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit): Flow<RoomListEntriesUpdate> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : RoomListEntriesListener {
|
||||
override fun onUpdate(roomEntriesUpdate: RoomListEntriesUpdate) {
|
||||
|
|
@ -48,7 +39,7 @@ fun RoomListInterface.roomListEntriesUpdateFlow(onInitialList: suspend (List<Roo
|
|||
result.entriesStream
|
||||
}
|
||||
|
||||
fun RoomListInterface.roomOrNull(roomId: String): RoomListItem? {
|
||||
fun RoomListService.roomOrNull(roomId: String): RoomListItem? {
|
||||
return try {
|
||||
room(roomId)
|
||||
} catch (failure: Throwable) {
|
||||
|
|
@ -56,3 +47,13 @@ fun RoomListInterface.roomOrNull(roomId: String): RoomListItem? {
|
|||
return null
|
||||
}
|
||||
}
|
||||
|
||||
fun RoomListService.stateFlow(): Flow<RoomListServiceState> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : RoomListServiceStateListener {
|
||||
override fun onUpdate(state: RoomListServiceState) {
|
||||
trySendBlocking(state)
|
||||
}
|
||||
}
|
||||
state(listener)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ import org.matrix.rustcomponents.sdk.RoomMember
|
|||
import org.matrix.rustcomponents.sdk.RoomSubscription
|
||||
import org.matrix.rustcomponents.sdk.genTransactionId
|
||||
import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
|
||||
class RustMatrixRoom(
|
||||
|
|
@ -64,6 +65,7 @@ class RustMatrixRoom(
|
|||
sessionCoroutineScope: CoroutineScope,
|
||||
private val coroutineDispatchers: CoroutineDispatchers,
|
||||
private val systemClock: SystemClock,
|
||||
private val roomContentForwarder: RoomContentForwarder,
|
||||
) : MatrixRoom {
|
||||
|
||||
override val roomId = RoomId(innerRoom.id())
|
||||
|
|
@ -307,6 +309,14 @@ class RustMatrixRoom(
|
|||
}
|
||||
}
|
||||
|
||||
override suspend fun forwardEvent(eventId: EventId, roomIds: List<RoomId>): Result<Unit> = withContext(coroutineDispatchers.io) {
|
||||
runCatching {
|
||||
roomContentForwarder.forward(fromRoom = innerRoom, eventId = eventId, toRoomIds = roomIds)
|
||||
}.onFailure {
|
||||
Timber.e(it)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun retrySendMessage(transactionId: String): Result<Unit> =
|
||||
withContext(coroutineDispatchers.io) {
|
||||
runCatching {
|
||||
|
|
@ -350,9 +360,19 @@ class RustMatrixRoom(
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private suspend fun fetchMembers() = withContext(coroutineDispatchers.io) {
|
||||
runCatching {
|
||||
innerRoom.fetchMembers()
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun reportContent(eventId: EventId, reason: String, blockUserId: UserId?): Result<Unit> = withContext(coroutineDispatchers.io) {
|
||||
runCatching {
|
||||
innerRoom.reportContent(eventId = eventId.value, score = null, reason = reason)
|
||||
if (blockUserId != null) {
|
||||
innerRoom.ignoreUser(blockUserId.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,13 +30,13 @@ import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
|
|||
import org.matrix.rustcomponents.sdk.RoomListEntry
|
||||
import org.matrix.rustcomponents.sdk.RoomListException
|
||||
import org.matrix.rustcomponents.sdk.RoomListInput
|
||||
import org.matrix.rustcomponents.sdk.RoomListInterface
|
||||
import org.matrix.rustcomponents.sdk.RoomListRange
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import timber.log.Timber
|
||||
import java.util.UUID
|
||||
|
||||
internal class RustRoomSummaryDataSource(
|
||||
private val roomListService: RoomListInterface,
|
||||
private val roomListService: RoomListService,
|
||||
private val sessionCoroutineScope: CoroutineScope,
|
||||
private val coroutineDispatchers: CoroutineDispatchers,
|
||||
private val roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory(),
|
||||
|
|
@ -48,7 +48,7 @@ internal class RustRoomSummaryDataSource(
|
|||
|
||||
fun init() {
|
||||
sessionCoroutineScope.launch(coroutineDispatchers.computation) {
|
||||
roomListService.roomListEntriesUpdateFlow { roomListEntries ->
|
||||
roomListService.allRooms().entriesFlow { roomListEntries ->
|
||||
roomList.value = roomListEntries.map(::buildSummaryForRoomListEntry)
|
||||
}.onEach { update ->
|
||||
roomList.getAndUpdate {
|
||||
|
|
|
|||
|
|
@ -17,14 +17,14 @@
|
|||
package io.element.android.libraries.matrix.impl.sync
|
||||
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import org.matrix.rustcomponents.sdk.RoomListState
|
||||
import org.matrix.rustcomponents.sdk.RoomListServiceState
|
||||
|
||||
internal fun RoomListState.toSyncState(): SyncState {
|
||||
internal fun RoomListServiceState.toSyncState(): SyncState {
|
||||
return when (this) {
|
||||
RoomListState.INIT,
|
||||
RoomListState.SETTING_UP -> SyncState.Idle
|
||||
RoomListState.RUNNING -> SyncState.Syncing
|
||||
RoomListState.ERROR -> SyncState.InError
|
||||
RoomListState.TERMINATED -> SyncState.Terminated
|
||||
RoomListServiceState.INIT,
|
||||
RoomListServiceState.SETTING_UP -> SyncState.Idle
|
||||
RoomListServiceState.RUNNING -> SyncState.Syncing
|
||||
RoomListServiceState.ERROR -> SyncState.InError
|
||||
RoomListServiceState.TERMINATED -> SyncState.Terminated
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,20 +18,18 @@ package io.element.android.libraries.matrix.impl.sync
|
|||
|
||||
import io.element.android.libraries.matrix.api.sync.SyncService
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import io.element.android.libraries.matrix.impl.room.roomListStateFlow
|
||||
import io.element.android.libraries.matrix.impl.room.stateFlow
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import org.matrix.rustcomponents.sdk.RoomList
|
||||
import org.matrix.rustcomponents.sdk.RoomListState
|
||||
import timber.log.Timber
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import org.matrix.rustcomponents.sdk.RoomListServiceState
|
||||
|
||||
class RustSyncService(
|
||||
private val roomListService: RoomList,
|
||||
private val roomListService: RoomListService,
|
||||
private val sessionCoroutineScope: CoroutineScope
|
||||
) : SyncService {
|
||||
|
||||
|
|
@ -49,8 +47,8 @@ class RustSyncService(
|
|||
|
||||
override val syncState: StateFlow<SyncState> =
|
||||
roomListService
|
||||
.roomListStateFlow()
|
||||
.map(RoomListState::toSyncState)
|
||||
.stateFlow()
|
||||
.map(RoomListServiceState::toSyncState)
|
||||
.distinctUntilChanged()
|
||||
.stateIn(sessionCoroutineScope, SharingStarted.WhileSubscribed(), SyncState.Idle)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,6 +70,7 @@ fun RustEventSendState?.map(): EventSendState? {
|
|||
RustEventSendState.NotSentYet -> EventSendState.NotSentYet
|
||||
is RustEventSendState.SendingFailed -> EventSendState.SendingFailed(error)
|
||||
is RustEventSendState.Sent -> EventSendState.Sent(EventId(eventId))
|
||||
RustEventSendState.Cancelled -> EventSendState.Canceled
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue