Merge pull request #692 from vector-im/feature/fga/room_list_api
Feature/fga/room list api
This commit is contained in:
commit
2ce614ae97
44 changed files with 911 additions and 727 deletions
|
|
@ -22,7 +22,9 @@ import androidx.compose.foundation.layout.fillMaxSize
|
|||
import androidx.compose.runtime.Composable
|
||||
import androidx.compose.ui.Alignment
|
||||
import androidx.compose.ui.Modifier
|
||||
import androidx.lifecycle.Lifecycle
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import androidx.lifecycle.repeatOnLifecycle
|
||||
import coil.Coil
|
||||
import com.bumble.appyx.core.composable.Children
|
||||
import com.bumble.appyx.core.lifecycle.subscribe
|
||||
|
|
@ -43,6 +45,8 @@ import io.element.android.appnav.loggedin.LoggedInNode
|
|||
import io.element.android.features.analytics.api.AnalyticsEntryPoint
|
||||
import io.element.android.features.createroom.api.CreateRoomEntryPoint
|
||||
import io.element.android.features.invitelist.api.InviteListEntryPoint
|
||||
import io.element.android.features.networkmonitor.api.NetworkMonitor
|
||||
import io.element.android.features.networkmonitor.api.NetworkStatus
|
||||
import io.element.android.features.preferences.api.PreferencesEntryPoint
|
||||
import io.element.android.features.roomlist.api.RoomListEntryPoint
|
||||
import io.element.android.features.verifysession.api.VerifySessionEntryPoint
|
||||
|
|
@ -59,10 +63,13 @@ import io.element.android.libraries.di.AppScope
|
|||
import io.element.android.libraries.matrix.api.MatrixClient
|
||||
import io.element.android.libraries.matrix.api.core.MAIN_SPACE
|
||||
import io.element.android.libraries.matrix.api.core.RoomId
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import io.element.android.libraries.matrix.ui.di.MatrixUIBindings
|
||||
import io.element.android.services.analytics.api.AnalyticsService
|
||||
import io.element.android.services.appnavstate.api.AppNavigationStateService
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
|
|
@ -82,6 +89,7 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
private val inviteListEntryPoint: InviteListEntryPoint,
|
||||
private val analyticsService: AnalyticsService,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val networkMonitor: NetworkMonitor,
|
||||
snackbarDispatcher: SnackbarDispatcher,
|
||||
) : BackstackNode<LoggedInFlowNode.NavTarget>(
|
||||
backstack = BackStack(
|
||||
|
|
@ -128,6 +136,7 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
) : NodeInputs
|
||||
|
||||
private val inputs: Inputs = inputs()
|
||||
private val syncService = inputs.matrixClient.syncService()
|
||||
private val loggedInFlowProcessor = LoggedInEventProcessor(
|
||||
snackbarDispatcher,
|
||||
inputs.matrixClient.roomMembershipObserver(),
|
||||
|
|
@ -142,12 +151,17 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
plugins<LifecycleCallback>().forEach { it.onFlowCreated(id, inputs.matrixClient) }
|
||||
val imageLoaderFactory = bindings<MatrixUIBindings>().loggedInImageLoaderFactory()
|
||||
Coil.setImageLoader(imageLoaderFactory)
|
||||
inputs.matrixClient.startSync()
|
||||
appNavigationStateService.onNavigateToSession(id, inputs.matrixClient.sessionId)
|
||||
// TODO We do not support Space yet, so directly navigate to main space
|
||||
appNavigationStateService.onNavigateToSpace(id, MAIN_SPACE)
|
||||
loggedInFlowProcessor.observeEvents(coroutineScope)
|
||||
},
|
||||
onResume = {
|
||||
syncService.startSync()
|
||||
},
|
||||
onPause = {
|
||||
syncService.stopSync()
|
||||
},
|
||||
onDestroy = {
|
||||
val imageLoaderFactory = bindings<MatrixUIBindings>().notLoggedInImageLoaderFactory()
|
||||
Coil.setImageLoader(imageLoaderFactory)
|
||||
|
|
@ -157,6 +171,27 @@ class LoggedInFlowNode @AssistedInject constructor(
|
|||
loggedInFlowProcessor.stopObserving()
|
||||
}
|
||||
)
|
||||
|
||||
observeSyncStateAndNetworkStatus()
|
||||
}
|
||||
|
||||
private fun observeSyncStateAndNetworkStatus() {
|
||||
lifecycleScope.launch {
|
||||
repeatOnLifecycle(Lifecycle.State.RESUMED) {
|
||||
combine(
|
||||
syncService.syncState,
|
||||
networkMonitor.connectivity
|
||||
) { syncState, networkStatus ->
|
||||
syncState == SyncState.InError && networkStatus == NetworkStatus.Online
|
||||
}
|
||||
.distinctUntilChanged()
|
||||
.collect { restartSync ->
|
||||
if (restartSync) {
|
||||
syncService.startSync()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sealed interface NavTarget : Parcelable {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ package io.element.android.appnav
|
|||
|
||||
import android.os.Parcelable
|
||||
import androidx.compose.runtime.Composable
|
||||
import androidx.compose.runtime.DisposableEffect
|
||||
import androidx.compose.ui.Modifier
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import com.bumble.appyx.core.composable.Children
|
||||
|
|
@ -89,6 +88,7 @@ class RoomFlowNode @AssistedInject constructor(
|
|||
lifecycle.subscribe(
|
||||
onCreate = {
|
||||
Timber.v("OnCreate")
|
||||
inputs.room.open()
|
||||
plugins<LifecycleCallback>().forEach { it.onFlowCreated(id, inputs.room) }
|
||||
appNavigationStateService.onNavigateToRoom(id, inputs.room.roomId)
|
||||
fetchRoomMembers()
|
||||
|
|
@ -159,18 +159,8 @@ class RoomFlowNode @AssistedInject constructor(
|
|||
data class RoomMemberDetails(val userId: UserId) : NavTarget
|
||||
}
|
||||
|
||||
private val timeline = inputs.room.timeline()
|
||||
|
||||
@Composable
|
||||
override fun View(modifier: Modifier) {
|
||||
|
||||
DisposableEffect(Unit) {
|
||||
timeline.initialize()
|
||||
onDispose {
|
||||
timeline.dispose()
|
||||
}
|
||||
}
|
||||
|
||||
Children(
|
||||
navModel = backstack,
|
||||
modifier = modifier,
|
||||
|
|
|
|||
|
|
@ -54,8 +54,8 @@ class InviteListPresenter @Inject constructor(
|
|||
@Composable
|
||||
override fun present(): InviteListState {
|
||||
val invites by client
|
||||
.invitesDataSource
|
||||
.roomSummaries()
|
||||
.roomSummaryDataSource
|
||||
.inviteRooms()
|
||||
.collectAsState()
|
||||
|
||||
var seenInvites by remember { mutableStateOf<Set<RoomId>>(emptySet()) }
|
||||
|
|
|
|||
|
|
@ -46,10 +46,10 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - starts empty, adds invites when received`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource()
|
||||
val presenter = InviteListPresenter(
|
||||
FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
),
|
||||
FakeSeenInvitesStore(),
|
||||
FakeAnalyticsService(),
|
||||
|
|
@ -60,7 +60,7 @@ class InviteListPresenterTests {
|
|||
val initialState = awaitItem()
|
||||
Truth.assertThat(initialState.inviteList).isEmpty()
|
||||
|
||||
invitesDataSource.postRoomSummary(listOf(aRoomSummary()))
|
||||
roomSummaryDataSource.postInviteRooms(listOf(aRoomSummary()))
|
||||
|
||||
val withInviteState = awaitItem()
|
||||
Truth.assertThat(withInviteState.inviteList.size).isEqualTo(1)
|
||||
|
|
@ -71,10 +71,10 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - uses user ID and avatar for direct invites`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withDirectChatInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withDirectChatInvitation()
|
||||
val presenter = InviteListPresenter(
|
||||
FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
),
|
||||
FakeSeenInvitesStore(),
|
||||
FakeAnalyticsService(),
|
||||
|
|
@ -101,10 +101,10 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - includes sender details for room invites`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val presenter = InviteListPresenter(
|
||||
FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
),
|
||||
FakeSeenInvitesStore(),
|
||||
FakeAnalyticsService(),
|
||||
|
|
@ -129,10 +129,10 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - shows confirm dialog for declining direct chat invites`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withDirectChatInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withDirectChatInvitation()
|
||||
val presenter = InviteListPresenter(
|
||||
FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
),
|
||||
FakeSeenInvitesStore(),
|
||||
FakeAnalyticsService(),
|
||||
|
|
@ -154,10 +154,10 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - shows confirm dialog for declining room invites`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val presenter = InviteListPresenter(
|
||||
FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
),
|
||||
FakeSeenInvitesStore(),
|
||||
FakeAnalyticsService(),
|
||||
|
|
@ -179,10 +179,10 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - hides confirm dialog when cancelling`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val presenter = InviteListPresenter(
|
||||
FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
),
|
||||
FakeSeenInvitesStore(),
|
||||
FakeAnalyticsService(),
|
||||
|
|
@ -204,9 +204,9 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - declines invite after confirming`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val client = FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
)
|
||||
val room = FakeMatrixRoom()
|
||||
val presenter = InviteListPresenter(client, FakeSeenInvitesStore(), FakeAnalyticsService())
|
||||
|
|
@ -230,9 +230,9 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - declines invite after confirming and sets state on error`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val client = FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
)
|
||||
val room = FakeMatrixRoom()
|
||||
val presenter = InviteListPresenter(client, FakeSeenInvitesStore(), FakeAnalyticsService())
|
||||
|
|
@ -261,9 +261,9 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - dismisses declining error state`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val client = FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
)
|
||||
val room = FakeMatrixRoom()
|
||||
val presenter = InviteListPresenter(client, FakeSeenInvitesStore(), FakeAnalyticsService())
|
||||
|
|
@ -293,9 +293,9 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - accepts invites and sets state on success`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val client = FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
)
|
||||
val room = FakeMatrixRoom()
|
||||
val presenter = InviteListPresenter(client, FakeSeenInvitesStore(), FakeAnalyticsService())
|
||||
|
|
@ -316,9 +316,9 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - accepts invites and sets state on error`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val client = FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
)
|
||||
val room = FakeMatrixRoom()
|
||||
val presenter = InviteListPresenter(client, FakeSeenInvitesStore(), FakeAnalyticsService())
|
||||
|
|
@ -341,9 +341,9 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - dismisses accepting error state`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().withRoomInvitation()
|
||||
val client = FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
)
|
||||
val room = FakeMatrixRoom()
|
||||
val presenter = InviteListPresenter(client, FakeSeenInvitesStore(), FakeAnalyticsService())
|
||||
|
|
@ -368,11 +368,11 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - stores seen invites when received`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource()
|
||||
val store = FakeSeenInvitesStore()
|
||||
val presenter = InviteListPresenter(
|
||||
FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
),
|
||||
store,
|
||||
FakeAnalyticsService(),
|
||||
|
|
@ -383,19 +383,19 @@ class InviteListPresenterTests {
|
|||
awaitItem()
|
||||
|
||||
// When one invite is received, that ID is saved
|
||||
invitesDataSource.postRoomSummary(listOf(aRoomSummary()))
|
||||
roomSummaryDataSource.postInviteRooms(listOf(aRoomSummary()))
|
||||
|
||||
awaitItem()
|
||||
Truth.assertThat(store.getProvidedRoomIds()).isEqualTo(setOf(A_ROOM_ID))
|
||||
|
||||
// When a second is added, both are saved
|
||||
invitesDataSource.postRoomSummary(listOf(aRoomSummary(), aRoomSummary(A_ROOM_ID_2)))
|
||||
roomSummaryDataSource.postInviteRooms(listOf(aRoomSummary(), aRoomSummary(A_ROOM_ID_2)))
|
||||
|
||||
awaitItem()
|
||||
Truth.assertThat(store.getProvidedRoomIds()).isEqualTo(setOf(A_ROOM_ID, A_ROOM_ID_2))
|
||||
|
||||
// When they're both dismissed, an empty set is saved
|
||||
invitesDataSource.postRoomSummary(listOf())
|
||||
roomSummaryDataSource.postInviteRooms(listOf())
|
||||
|
||||
awaitItem()
|
||||
Truth.assertThat(store.getProvidedRoomIds()).isEmpty()
|
||||
|
|
@ -404,12 +404,12 @@ class InviteListPresenterTests {
|
|||
|
||||
@Test
|
||||
fun `present - marks invite as new if they're unseen`() = runTest {
|
||||
val invitesDataSource = FakeRoomSummaryDataSource()
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource()
|
||||
val store = FakeSeenInvitesStore()
|
||||
store.publishRoomIds(setOf(A_ROOM_ID))
|
||||
val presenter = InviteListPresenter(
|
||||
FakeMatrixClient(
|
||||
invitesDataSource = invitesDataSource,
|
||||
roomSummaryDataSource = roomSummaryDataSource,
|
||||
),
|
||||
store,
|
||||
FakeAnalyticsService(),
|
||||
|
|
@ -419,7 +419,7 @@ class InviteListPresenterTests {
|
|||
}.test {
|
||||
awaitItem()
|
||||
|
||||
invitesDataSource.postRoomSummary(listOf(aRoomSummary(), aRoomSummary(A_ROOM_ID_2)))
|
||||
roomSummaryDataSource.postInviteRooms(listOf(aRoomSummary(), aRoomSummary(A_ROOM_ID_2)))
|
||||
skipItems(1)
|
||||
|
||||
val withInviteState = awaitItem()
|
||||
|
|
@ -432,7 +432,7 @@ class InviteListPresenterTests {
|
|||
}
|
||||
|
||||
private suspend fun FakeRoomSummaryDataSource.withRoomInvitation(): FakeRoomSummaryDataSource {
|
||||
postRoomSummary(
|
||||
postInviteRooms(
|
||||
listOf(
|
||||
RoomSummary.Filled(
|
||||
RoomSummaryDetails(
|
||||
|
|
@ -461,7 +461,7 @@ class InviteListPresenterTests {
|
|||
}
|
||||
|
||||
private suspend fun FakeRoomSummaryDataSource.withDirectChatInvitation(): FakeRoomSummaryDataSource {
|
||||
postRoomSummary(
|
||||
postInviteRooms(
|
||||
listOf(
|
||||
RoomSummary.Filled(
|
||||
RoomSummaryDetails(
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ class MessagesPresenter @AssistedInject constructor(
|
|||
mutableStateOf(null)
|
||||
}
|
||||
|
||||
val networkConnectionStatus by networkMonitor.connectivity.collectAsState(initial = networkMonitor.currentConnectivityStatus)
|
||||
val networkConnectionStatus by networkMonitor.connectivity.collectAsState()
|
||||
|
||||
val snackbarMessage by snackbarDispatcher.collectSnackbarMessageAsState()
|
||||
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ class ForwardMessagesPresenter @AssistedInject constructor(
|
|||
var results: SearchBarResultState<ImmutableList<RoomSummaryDetails>> by remember { mutableStateOf(SearchBarResultState.NotSearching()) }
|
||||
val forwardingActionState: MutableState<Async<ImmutableList<RoomId>>> = remember { mutableStateOf(Async.Uninitialized) }
|
||||
|
||||
val summaries by client.roomSummaryDataSource.roomSummaries().collectAsState()
|
||||
val summaries by client.roomSummaryDataSource.allRooms().collectAsState()
|
||||
|
||||
LaunchedEffect(query, summaries) {
|
||||
val filteredSummaries = summaries.filterIsInstance<RoomSummary.Filled>()
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ class ForwardMessagesPresenterTests {
|
|||
@Test
|
||||
fun `present - update query`() = runTest {
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource().apply {
|
||||
postRoomSummary(listOf(RoomSummary.Filled(aRoomSummaryDetail())))
|
||||
postAllRooms(listOf(RoomSummary.Filled(aRoomSummaryDetail())))
|
||||
}
|
||||
val client = FakeMatrixClient(roomSummaryDataSource = roomSummaryDataSource)
|
||||
val presenter = aPresenter(client = client)
|
||||
|
|
|
|||
|
|
@ -16,9 +16,8 @@
|
|||
|
||||
package io.element.android.features.networkmonitor.api
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
|
||||
interface NetworkMonitor {
|
||||
val connectivity: Flow<NetworkStatus>
|
||||
val currentConnectivityStatus: NetworkStatus
|
||||
val connectivity: StateFlow<NetworkStatus>
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
@file:OptIn(FlowPreview::class)
|
||||
|
||||
package io.element.android.features.networkmonitor.impl
|
||||
|
||||
import android.content.Context
|
||||
|
|
@ -27,63 +29,80 @@ import io.element.android.features.networkmonitor.api.NetworkStatus
|
|||
import io.element.android.libraries.di.AppScope
|
||||
import io.element.android.libraries.di.ApplicationContext
|
||||
import io.element.android.libraries.di.SingleIn
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.inject.Inject
|
||||
|
||||
@ContributesBinding(scope = AppScope::class)
|
||||
@SingleIn(AppScope::class)
|
||||
class NetworkMonitorImpl @Inject constructor(
|
||||
@ApplicationContext context: Context
|
||||
@ApplicationContext context: Context,
|
||||
appCoroutineScope: CoroutineScope,
|
||||
) : NetworkMonitor {
|
||||
|
||||
private val connectivityManager: ConnectivityManager = context.getSystemService(ConnectivityManager::class.java)
|
||||
|
||||
private val callback = object : ConnectivityManager.NetworkCallback() {
|
||||
override fun onAvailable(network: Network) {
|
||||
_connectivity.value = connectivityManager.currentConnectionStatus()
|
||||
Timber.v("Connectivity status (available): ${connectivityManager.currentConnectionStatus()}")
|
||||
override val connectivity: StateFlow<NetworkStatus> = callbackFlow {
|
||||
|
||||
/**
|
||||
* Calling connectivityManager methods synchronously from the callbacks is not safe.
|
||||
* So instead we just keep the count of active networks, ie. those checking the capability request.
|
||||
* Debounce the result to avoid quick offline<->online changes.
|
||||
*/
|
||||
val callback = object : ConnectivityManager.NetworkCallback() {
|
||||
|
||||
private val activeNetworksCount = AtomicInteger(0)
|
||||
|
||||
override fun onLost(network: Network) {
|
||||
if (activeNetworksCount.decrementAndGet() == 0) {
|
||||
trySendBlocking(NetworkStatus.Offline)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onAvailable(network: Network) {
|
||||
if (activeNetworksCount.incrementAndGet() > 0) {
|
||||
trySendBlocking(NetworkStatus.Online)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onLost(network: Network) {
|
||||
_connectivity.value = connectivityManager.currentConnectionStatus()
|
||||
Timber.v("Connectivity status (lost): ${connectivityManager.currentConnectionStatus()}")
|
||||
}
|
||||
|
||||
override fun onCapabilitiesChanged(
|
||||
network: Network,
|
||||
networkCapabilities: NetworkCapabilities
|
||||
) {
|
||||
_connectivity.value = connectivityManager.currentConnectionStatus()
|
||||
Timber.v("Connectivity status (changed): ${connectivityManager.currentConnectionStatus()}")
|
||||
}
|
||||
}
|
||||
|
||||
private val _connectivity = MutableStateFlow(NetworkStatus.Online)
|
||||
override val connectivity: Flow<NetworkStatus> = _connectivity
|
||||
|
||||
override val currentConnectivityStatus: NetworkStatus get() = _connectivity.value
|
||||
|
||||
init {
|
||||
listenToConnectionChanges()
|
||||
}
|
||||
|
||||
private fun listenToConnectionChanges() {
|
||||
trySendBlocking(connectivityManager.activeNetworkStatus())
|
||||
val request = NetworkRequest.Builder()
|
||||
// .addTransportType(NetworkCapabilities.TRANSPORT_WIFI)
|
||||
// .addTransportType(NetworkCapabilities.TRANSPORT_CELLULAR)
|
||||
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
|
||||
.addCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED)
|
||||
.build()
|
||||
connectivityManager.registerNetworkCallback(request, callback)
|
||||
|
||||
_connectivity.tryEmit(connectivityManager.currentConnectionStatus())
|
||||
connectivityManager.registerNetworkCallback(request, callback)
|
||||
Timber.d("Subscribe")
|
||||
awaitClose {
|
||||
Timber.d("Unsubscribe")
|
||||
connectivityManager.unregisterNetworkCallback(callback)
|
||||
}
|
||||
}
|
||||
.distinctUntilChanged()
|
||||
.debounce(300)
|
||||
.onEach {
|
||||
Timber.d("NetworkStatus changed=$it")
|
||||
}
|
||||
.stateIn(appCoroutineScope, SharingStarted.WhileSubscribed(), connectivityManager.activeNetworkStatus())
|
||||
|
||||
private fun ConnectivityManager.activeNetworkStatus(): NetworkStatus {
|
||||
return activeNetwork?.let {
|
||||
getNetworkCapabilities(it)?.getNetworkStatus()
|
||||
} ?: NetworkStatus.Offline
|
||||
}
|
||||
|
||||
private fun ConnectivityManager.currentConnectionStatus(): NetworkStatus {
|
||||
val hasInternet = activeNetwork?.let(::getNetworkCapabilities)
|
||||
?.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
|
||||
?: false
|
||||
private fun NetworkCapabilities.getNetworkStatus(): NetworkStatus {
|
||||
val hasInternet = hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED)
|
||||
return if (hasInternet) {
|
||||
NetworkStatus.Online
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -18,13 +18,8 @@ package io.element.android.features.networkmonitor.test
|
|||
|
||||
import io.element.android.features.networkmonitor.api.NetworkMonitor
|
||||
import io.element.android.features.networkmonitor.api.NetworkStatus
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
|
||||
class FakeNetworkMonitor(initialStatus: NetworkStatus = NetworkStatus.Online) : NetworkMonitor {
|
||||
override val currentConnectivityStatus: NetworkStatus
|
||||
get() = _connectivityStatus.value
|
||||
|
||||
private val _connectivityStatus: MutableStateFlow<NetworkStatus> = MutableStateFlow(initialStatus)
|
||||
override val connectivity: Flow<NetworkStatus> = _connectivityStatus
|
||||
override val connectivity = MutableStateFlow(initialStatus)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,8 +43,8 @@ class DefaultInviteStateDataSource @Inject constructor(
|
|||
@Composable
|
||||
override fun inviteState(): InvitesState {
|
||||
val invites by client
|
||||
.invitesDataSource
|
||||
.roomSummaries()
|
||||
.roomSummaryDataSource
|
||||
.inviteRooms()
|
||||
.collectAsState()
|
||||
|
||||
val seenInvites by seenInvitesStore
|
||||
|
|
|
|||
|
|
@ -78,10 +78,10 @@ class RoomListPresenter @Inject constructor(
|
|||
var filter by rememberSaveable { mutableStateOf("") }
|
||||
val roomSummaries by client
|
||||
.roomSummaryDataSource
|
||||
.roomSummaries()
|
||||
.allRooms()
|
||||
.collectAsState()
|
||||
|
||||
val networkConnectionStatus by networkMonitor.connectivity.collectAsState(initial = networkMonitor.currentConnectivityStatus)
|
||||
val networkConnectionStatus by networkMonitor.connectivity.collectAsState()
|
||||
|
||||
Timber.v("RoomSummaries size = ${roomSummaries.size}")
|
||||
|
||||
|
|
@ -178,7 +178,7 @@ class RoomListPresenter @Inject constructor(
|
|||
// Safe to give bigger size than room list
|
||||
val extendedRangeEnd = range.last + midExtendedRangeSize
|
||||
val extendedRange = IntRange(extendedRangeStart, extendedRangeEnd)
|
||||
client.roomSummaryDataSource.setSlidingSyncRange(extendedRange)
|
||||
client.roomSummaryDataSource.updateRoomListVisibleRange(extendedRange)
|
||||
}
|
||||
|
||||
private suspend fun mapRoomSummaries(
|
||||
|
|
|
|||
|
|
@ -34,8 +34,8 @@ internal class DefaultInviteStateDataSourceTest {
|
|||
|
||||
@Test
|
||||
fun `emits NoInvites state if invites list is empty`() = runTest {
|
||||
val matrixDataSource = FakeRoomSummaryDataSource()
|
||||
val client = FakeMatrixClient(invitesDataSource = matrixDataSource)
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource()
|
||||
val client = FakeMatrixClient(roomSummaryDataSource = roomSummaryDataSource)
|
||||
val seenStore = FakeSeenInvitesStore()
|
||||
val dataSource = DefaultInviteStateDataSource(client, seenStore, testCoroutineDispatchers())
|
||||
|
||||
|
|
@ -48,9 +48,9 @@ internal class DefaultInviteStateDataSourceTest {
|
|||
|
||||
@Test
|
||||
fun `emits NewInvites state if unseen invite exists`() = runTest {
|
||||
val matrixDataSource = FakeRoomSummaryDataSource()
|
||||
matrixDataSource.postRoomSummary(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID)))
|
||||
val client = FakeMatrixClient(invitesDataSource = matrixDataSource)
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource()
|
||||
roomSummaryDataSource.postInviteRooms(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID)))
|
||||
val client = FakeMatrixClient(roomSummaryDataSource = roomSummaryDataSource)
|
||||
val seenStore = FakeSeenInvitesStore()
|
||||
val dataSource = DefaultInviteStateDataSource(client, seenStore, testCoroutineDispatchers())
|
||||
|
||||
|
|
@ -64,9 +64,9 @@ internal class DefaultInviteStateDataSourceTest {
|
|||
|
||||
@Test
|
||||
fun `emits NewInvites state if multiple invites exist and at least one is unseen`() = runTest {
|
||||
val matrixDataSource = FakeRoomSummaryDataSource()
|
||||
matrixDataSource.postRoomSummary(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID), aRoomSummaryFilled(roomId = A_ROOM_ID_2)))
|
||||
val client = FakeMatrixClient(invitesDataSource = matrixDataSource)
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource()
|
||||
roomSummaryDataSource.postInviteRooms(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID), aRoomSummaryFilled(roomId = A_ROOM_ID_2)))
|
||||
val client = FakeMatrixClient(roomSummaryDataSource = roomSummaryDataSource)
|
||||
val seenStore = FakeSeenInvitesStore()
|
||||
seenStore.publishRoomIds(setOf(A_ROOM_ID))
|
||||
val dataSource = DefaultInviteStateDataSource(client, seenStore, testCoroutineDispatchers(useUnconfinedTestDispatcher = true))
|
||||
|
|
@ -81,9 +81,9 @@ internal class DefaultInviteStateDataSourceTest {
|
|||
|
||||
@Test
|
||||
fun `emits SeenInvites state if invite exists in seen store`() = runTest {
|
||||
val matrixDataSource = FakeRoomSummaryDataSource()
|
||||
matrixDataSource.postRoomSummary(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID)))
|
||||
val client = FakeMatrixClient(invitesDataSource = matrixDataSource)
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource()
|
||||
roomSummaryDataSource.postInviteRooms(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID)))
|
||||
val client = FakeMatrixClient(roomSummaryDataSource = roomSummaryDataSource)
|
||||
val seenStore = FakeSeenInvitesStore()
|
||||
seenStore.publishRoomIds(setOf(A_ROOM_ID))
|
||||
val dataSource = DefaultInviteStateDataSource(client, seenStore, testCoroutineDispatchers(useUnconfinedTestDispatcher = true))
|
||||
|
|
@ -99,8 +99,8 @@ internal class DefaultInviteStateDataSourceTest {
|
|||
|
||||
@Test
|
||||
fun `emits new state in response to upstream events`() = runTest {
|
||||
val matrixDataSource = FakeRoomSummaryDataSource()
|
||||
val client = FakeMatrixClient(invitesDataSource = matrixDataSource)
|
||||
val roomSummaryDataSource = FakeRoomSummaryDataSource()
|
||||
val client = FakeMatrixClient(roomSummaryDataSource = roomSummaryDataSource)
|
||||
val seenStore = FakeSeenInvitesStore()
|
||||
val dataSource = DefaultInviteStateDataSource(client, seenStore, testCoroutineDispatchers())
|
||||
|
||||
|
|
@ -111,7 +111,7 @@ internal class DefaultInviteStateDataSourceTest {
|
|||
Truth.assertThat(awaitItem()).isEqualTo(InvitesState.NoInvites)
|
||||
|
||||
// When a single invite is received, state should be NewInvites
|
||||
matrixDataSource.postRoomSummary(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID)))
|
||||
roomSummaryDataSource.postInviteRooms(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID)))
|
||||
skipItems(1)
|
||||
Truth.assertThat(awaitItem()).isEqualTo(InvitesState.NewInvites)
|
||||
|
||||
|
|
@ -121,12 +121,12 @@ internal class DefaultInviteStateDataSourceTest {
|
|||
Truth.assertThat(awaitItem()).isEqualTo(InvitesState.SeenInvites)
|
||||
|
||||
// Another new invite resets it to NewInvites
|
||||
matrixDataSource.postRoomSummary(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID), aRoomSummaryFilled(roomId = A_ROOM_ID_2)))
|
||||
roomSummaryDataSource.postInviteRooms(listOf(aRoomSummaryFilled(roomId = A_ROOM_ID), aRoomSummaryFilled(roomId = A_ROOM_ID_2)))
|
||||
skipItems(1)
|
||||
Truth.assertThat(awaitItem()).isEqualTo(InvitesState.NewInvites)
|
||||
|
||||
// All of the invites going away reverts to NoInvites
|
||||
matrixDataSource.postRoomSummary(emptyList())
|
||||
roomSummaryDataSource.postInviteRooms(emptyList())
|
||||
skipItems(1)
|
||||
Truth.assertThat(awaitItem()).isEqualTo(InvitesState.NoInvites)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -147,7 +147,7 @@ class RoomListPresenterTests {
|
|||
// Room list is loaded with 16 placeholders
|
||||
Truth.assertThat(withUserState.roomList.size).isEqualTo(16)
|
||||
Truth.assertThat(withUserState.roomList.all { it.isPlaceholder }).isTrue()
|
||||
roomSummaryDataSource.postRoomSummary(listOf(aRoomSummaryFilled()))
|
||||
roomSummaryDataSource.postAllRooms(listOf(aRoomSummaryFilled()))
|
||||
skipItems(1)
|
||||
val withRoomState = awaitItem()
|
||||
Truth.assertThat(withRoomState.roomList.size).isEqualTo(1)
|
||||
|
|
@ -174,7 +174,7 @@ class RoomListPresenterTests {
|
|||
moleculeFlow(RecompositionClock.Immediate) {
|
||||
presenter.present()
|
||||
}.test {
|
||||
roomSummaryDataSource.postRoomSummary(listOf(aRoomSummaryFilled()))
|
||||
roomSummaryDataSource.postAllRooms(listOf(aRoomSummaryFilled()))
|
||||
skipItems(3)
|
||||
val loadedState = awaitItem()
|
||||
// Test filtering with result
|
||||
|
|
@ -212,7 +212,7 @@ class RoomListPresenterTests {
|
|||
moleculeFlow(RecompositionClock.Immediate) {
|
||||
presenter.present()
|
||||
}.test {
|
||||
roomSummaryDataSource.postRoomSummary(listOf(aRoomSummaryFilled()))
|
||||
roomSummaryDataSource.postAllRooms(listOf(aRoomSummaryFilled()))
|
||||
skipItems(3)
|
||||
val loadedState = awaitItem()
|
||||
// check initial value
|
||||
|
|
|
|||
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.core.coroutine
|
||||
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineName
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.job
|
||||
import kotlinx.coroutines.plus
|
||||
|
||||
/**
|
||||
* Create a child scope of the current scope.
|
||||
* The child scope will be cancelled if the parent scope is cancelled.
|
||||
* The child scope will be cancelled if an exception is thrown in the parent scope.
|
||||
* The parent scope won't be cancelled when an exception is thrown in the child scope.
|
||||
*
|
||||
* @param dispatcher the dispatcher to use for this scope.
|
||||
* @param name the name of the coroutine.
|
||||
*/
|
||||
fun CoroutineScope.childScope(
|
||||
dispatcher: CoroutineDispatcher,
|
||||
name: String,
|
||||
): CoroutineScope = run {
|
||||
val supervisorJob = SupervisorJob(parent = coroutineContext.job)
|
||||
this + dispatcher + supervisorJob + CoroutineName(name)
|
||||
}
|
||||
|
|
@ -27,6 +27,7 @@ import io.element.android.libraries.matrix.api.pusher.PushersService
|
|||
import io.element.android.libraries.matrix.api.room.MatrixRoom
|
||||
import io.element.android.libraries.matrix.api.room.RoomMembershipObserver
|
||||
import io.element.android.libraries.matrix.api.room.RoomSummaryDataSource
|
||||
import io.element.android.libraries.matrix.api.sync.SyncService
|
||||
import io.element.android.libraries.matrix.api.user.MatrixSearchUserResults
|
||||
import io.element.android.libraries.matrix.api.user.MatrixUser
|
||||
import io.element.android.libraries.matrix.api.verification.SessionVerificationService
|
||||
|
|
@ -35,7 +36,6 @@ import java.io.Closeable
|
|||
interface MatrixClient : Closeable {
|
||||
val sessionId: SessionId
|
||||
val roomSummaryDataSource: RoomSummaryDataSource
|
||||
val invitesDataSource: RoomSummaryDataSource
|
||||
val mediaLoader: MatrixMediaLoader
|
||||
fun getRoom(roomId: RoomId): MatrixRoom?
|
||||
fun findDM(userId: UserId): MatrixRoom?
|
||||
|
|
@ -45,8 +45,7 @@ interface MatrixClient : Closeable {
|
|||
suspend fun createDM(userId: UserId): Result<RoomId>
|
||||
suspend fun getProfile(userId: UserId): Result<MatrixUser>
|
||||
suspend fun searchUsers(searchTerm: String, limit: Long): Result<MatrixSearchUserResults>
|
||||
fun startSync()
|
||||
fun stopSync()
|
||||
fun syncService(): SyncService
|
||||
fun sessionVerificationService(): SessionVerificationService
|
||||
fun pushersService(): PushersService
|
||||
fun notificationService(): NotificationService
|
||||
|
|
@ -60,6 +59,5 @@ interface MatrixClient : Closeable {
|
|||
suspend fun loadUserDisplayName(): Result<String>
|
||||
suspend fun loadUserAvatarURLString(): Result<String?>
|
||||
suspend fun uploadMedia(mimeType: String, data: ByteArray, progressCallback: ProgressCallback?): Result<String>
|
||||
fun onSlidingSyncUpdate()
|
||||
fun roomMembershipObserver(): RoomMembershipObserver
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,6 +62,8 @@ interface MatrixRoom : Closeable {
|
|||
|
||||
fun timeline(): MatrixTimeline
|
||||
|
||||
fun open(): Result<Unit>
|
||||
|
||||
suspend fun userDisplayName(userId: UserId): Result<String?>
|
||||
|
||||
suspend fun userAvatarUrl(userId: UserId): Result<String?>
|
||||
|
|
|
|||
|
|
@ -19,6 +19,14 @@ package io.element.android.libraries.matrix.api.room
|
|||
import kotlinx.coroutines.flow.StateFlow
|
||||
|
||||
interface RoomSummaryDataSource {
|
||||
fun roomSummaries(): StateFlow<List<RoomSummary>>
|
||||
fun setSlidingSyncRange(range: IntRange)
|
||||
|
||||
sealed class LoadingState {
|
||||
object NotLoaded : LoadingState()
|
||||
data class Loaded(val numberOfRooms: Int): LoadingState()
|
||||
}
|
||||
|
||||
fun allRoomsLoadingState(): StateFlow<LoadingState>
|
||||
fun allRooms(): StateFlow<List<RoomSummary>>
|
||||
fun inviteRooms(): StateFlow<List<RoomSummary>>
|
||||
fun updateRoomListVisibleRange(range: IntRange)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.api.sync
|
||||
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
|
||||
interface SyncService {
|
||||
/**
|
||||
* Tries to start the sync. If already syncing it has no effect.
|
||||
*/
|
||||
fun startSync(): Result<Unit>
|
||||
|
||||
/**
|
||||
* Tries to stop the sync. If service is not syncing it has no effect.
|
||||
*/
|
||||
fun stopSync(): Result<Unit>
|
||||
|
||||
/**
|
||||
* Flow of [SyncState]. Will be updated as soon as the current [SyncState] changes.
|
||||
*/
|
||||
val syncState: StateFlow<SyncState>
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.api.sync
|
||||
|
||||
enum class SyncState {
|
||||
Idle,
|
||||
Syncing,
|
||||
InError,
|
||||
Terminated,
|
||||
}
|
||||
|
|
@ -28,20 +28,8 @@ interface MatrixTimeline {
|
|||
)
|
||||
|
||||
fun paginationState(): StateFlow<PaginationState>
|
||||
|
||||
fun timelineItems(): Flow<List<MatrixTimelineItem>>
|
||||
|
||||
suspend fun paginateBackwards(requestSize: Int, untilNumberOfItems: Int): Result<Unit>
|
||||
fun initialize()
|
||||
fun dispose()
|
||||
|
||||
/**
|
||||
* @param message markdown message
|
||||
*/
|
||||
suspend fun sendMessage(message: String): Result<Unit>
|
||||
|
||||
suspend fun editMessage(originalEventId: EventId, message: String): Result<Unit>
|
||||
|
||||
suspend fun replyMessage(inReplyToEventId: EventId, message: String): Result<Unit>
|
||||
|
||||
suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit>
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,13 +14,12 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
@file:OptIn(ExperimentalCoroutinesApi::class)
|
||||
|
||||
package io.element.android.libraries.matrix.impl
|
||||
|
||||
import io.element.android.libraries.androidutils.file.getSizeOfFiles
|
||||
import io.element.android.libraries.androidutils.file.safeDelete
|
||||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
|
||||
import io.element.android.libraries.core.coroutine.childScope
|
||||
import io.element.android.libraries.matrix.api.MatrixClient
|
||||
import io.element.android.libraries.matrix.api.core.ProgressCallback
|
||||
import io.element.android.libraries.matrix.api.core.RoomId
|
||||
|
|
@ -31,11 +30,11 @@ import io.element.android.libraries.matrix.api.createroom.RoomVisibility
|
|||
import io.element.android.libraries.matrix.api.media.MatrixMediaLoader
|
||||
import io.element.android.libraries.matrix.api.notification.NotificationService
|
||||
import io.element.android.libraries.matrix.api.pusher.PushersService
|
||||
import io.element.android.libraries.matrix.api.room.ForwardEventException
|
||||
import io.element.android.libraries.matrix.api.room.MatrixRoom
|
||||
import io.element.android.libraries.matrix.api.room.RoomMembershipObserver
|
||||
import io.element.android.libraries.matrix.api.room.RoomSummaryDataSource
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.EventType
|
||||
import io.element.android.libraries.matrix.api.sync.SyncService
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import io.element.android.libraries.matrix.api.user.MatrixSearchUserResults
|
||||
import io.element.android.libraries.matrix.api.user.MatrixUser
|
||||
import io.element.android.libraries.matrix.api.verification.SessionVerificationService
|
||||
|
|
@ -46,7 +45,8 @@ import io.element.android.libraries.matrix.impl.pushers.RustPushersService
|
|||
import io.element.android.libraries.matrix.impl.room.RoomContentForwarder
|
||||
import io.element.android.libraries.matrix.impl.room.RustMatrixRoom
|
||||
import io.element.android.libraries.matrix.impl.room.RustRoomSummaryDataSource
|
||||
import io.element.android.libraries.matrix.impl.sync.SlidingSyncObserverProxy
|
||||
import io.element.android.libraries.matrix.impl.room.roomOrNull
|
||||
import io.element.android.libraries.matrix.impl.sync.RustSyncService
|
||||
import io.element.android.libraries.matrix.impl.usersearch.UserProfileMapper
|
||||
import io.element.android.libraries.matrix.impl.usersearch.UserSearchResultMapper
|
||||
import io.element.android.libraries.matrix.impl.verification.RustSessionVerificationService
|
||||
|
|
@ -54,10 +54,7 @@ import io.element.android.libraries.sessionstorage.api.SessionStore
|
|||
import io.element.android.services.toolbox.api.systemclock.SystemClock
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
|
|
@ -66,18 +63,9 @@ import kotlinx.coroutines.withContext
|
|||
import kotlinx.coroutines.withTimeout
|
||||
import org.matrix.rustcomponents.sdk.Client
|
||||
import org.matrix.rustcomponents.sdk.ClientDelegate
|
||||
import org.matrix.rustcomponents.sdk.RequiredState
|
||||
import org.matrix.rustcomponents.sdk.RoomMessageEventContent
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncList
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListBuilder
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListOnceBuilt
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncRequestListFilters
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncSelectiveModeBuilder
|
||||
import org.matrix.rustcomponents.sdk.TaskHandle
|
||||
import org.matrix.rustcomponents.sdk.use
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import org.matrix.rustcomponents.sdk.CreateRoomParameters as RustCreateRoomParameters
|
||||
import org.matrix.rustcomponents.sdk.RoomPreset as RustRoomPreset
|
||||
import org.matrix.rustcomponents.sdk.RoomVisibility as RustRoomVisibility
|
||||
|
|
@ -85,7 +73,7 @@ import org.matrix.rustcomponents.sdk.RoomVisibility as RustRoomVisibility
|
|||
class RustMatrixClient constructor(
|
||||
private val client: Client,
|
||||
private val sessionStore: SessionStore,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val appCoroutineScope: CoroutineScope,
|
||||
private val dispatchers: CoroutineDispatchers,
|
||||
private val baseDirectory: File,
|
||||
private val baseCacheDirectory: File,
|
||||
|
|
@ -93,14 +81,16 @@ class RustMatrixClient constructor(
|
|||
) : MatrixClient {
|
||||
|
||||
override val sessionId: UserId = UserId(client.userId())
|
||||
|
||||
private val roomListService = client.roomListServiceWithEncryption()
|
||||
private val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, "Session-${sessionId}")
|
||||
private val verificationService = RustSessionVerificationService()
|
||||
private val syncService = RustSyncService(roomListService, sessionCoroutineScope)
|
||||
private val pushersService = RustPushersService(
|
||||
client = client,
|
||||
dispatchers = dispatchers,
|
||||
)
|
||||
private val notificationService = RustNotificationService(client)
|
||||
private var slidingSyncUpdateJob: Job? = null
|
||||
|
||||
|
||||
private val clientDelegate = object : ClientDelegate {
|
||||
override fun didReceiveAuthError(isSoftLogout: Boolean) {
|
||||
|
|
@ -109,125 +99,44 @@ class RustMatrixClient constructor(
|
|||
}
|
||||
}
|
||||
|
||||
private val visibleRoomsSlidingSyncFilters = SlidingSyncRequestListFilters(
|
||||
isDm = null,
|
||||
spaces = emptyList(),
|
||||
isEncrypted = null,
|
||||
isInvite = false,
|
||||
isTombstoned = false,
|
||||
roomTypes = emptyList(),
|
||||
notRoomTypes = listOf("m.space"),
|
||||
roomNameLike = null,
|
||||
tags = emptyList(),
|
||||
notTags = emptyList()
|
||||
)
|
||||
|
||||
private val visibleRoomsSlidingSyncList = MutableSharedFlow<SlidingSyncList>(replay = 1)
|
||||
private val visibleRoomsSlidingSyncListBuilder = SlidingSyncListBuilder("CurrentlyVisibleRooms")
|
||||
.timelineLimit(limit = 1u)
|
||||
.requiredState(
|
||||
requiredState = listOf(
|
||||
RequiredState(key = EventType.STATE_ROOM_AVATAR, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_ENCRYPTION, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""),
|
||||
)
|
||||
)
|
||||
.filters(visibleRoomsSlidingSyncFilters)
|
||||
.syncModeSelective(SlidingSyncSelectiveModeBuilder().addRange(0u, 20u))
|
||||
.onceBuilt(object : SlidingSyncListOnceBuilt {
|
||||
override fun updateList(list: SlidingSyncList): SlidingSyncList {
|
||||
visibleRoomsSlidingSyncList.tryEmit(list)
|
||||
return list
|
||||
}
|
||||
})
|
||||
|
||||
private val invitesSlidingSyncFilters = visibleRoomsSlidingSyncFilters.copy(isInvite = true)
|
||||
|
||||
private val invitesSlidingSyncList = MutableSharedFlow<SlidingSyncList>(replay = 1)
|
||||
private val invitesSlidingSyncListBuilder = SlidingSyncListBuilder("CurrentInvites")
|
||||
.timelineLimit(limit = 1u)
|
||||
.requiredState(
|
||||
requiredState = listOf(
|
||||
RequiredState(key = EventType.STATE_ROOM_AVATAR, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_ENCRYPTION, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""),
|
||||
)
|
||||
)
|
||||
.filters(invitesSlidingSyncFilters)
|
||||
.syncModeSelective(SlidingSyncSelectiveModeBuilder().addRange(0u, 20u))
|
||||
.onceBuilt(object : SlidingSyncListOnceBuilt {
|
||||
override fun updateList(list: SlidingSyncList): SlidingSyncList {
|
||||
invitesSlidingSyncList.tryEmit(list)
|
||||
return list
|
||||
}
|
||||
})
|
||||
|
||||
private val slidingSync = client
|
||||
.slidingSync("ElementX")
|
||||
// .homeserver("https://slidingsync.lab.matrix.org")
|
||||
.withCommonExtensions()
|
||||
.addList(visibleRoomsSlidingSyncListBuilder)
|
||||
.addList(invitesSlidingSyncListBuilder)
|
||||
.use {
|
||||
it.build()
|
||||
}
|
||||
|
||||
private val slidingSyncObserverProxy = SlidingSyncObserverProxy(coroutineScope)
|
||||
private val rustRoomSummaryDataSource: RustRoomSummaryDataSource =
|
||||
RustRoomSummaryDataSource(
|
||||
slidingSyncObserverProxy.updateSummaryFlow,
|
||||
slidingSync,
|
||||
visibleRoomsSlidingSyncList,
|
||||
dispatchers,
|
||||
roomListService = roomListService,
|
||||
sessionCoroutineScope = sessionCoroutineScope,
|
||||
coroutineDispatchers = dispatchers,
|
||||
)
|
||||
|
||||
override val roomSummaryDataSource: RoomSummaryDataSource
|
||||
get() = rustRoomSummaryDataSource
|
||||
|
||||
private val rustInvitesDataSource: RustRoomSummaryDataSource =
|
||||
RustRoomSummaryDataSource(
|
||||
slidingSyncObserverProxy.updateSummaryFlow,
|
||||
slidingSync,
|
||||
invitesSlidingSyncList,
|
||||
dispatchers,
|
||||
)
|
||||
|
||||
override val invitesDataSource: RoomSummaryDataSource
|
||||
get() = rustInvitesDataSource
|
||||
|
||||
private val rustMediaLoader = RustMediaLoader(baseCacheDirectory, dispatchers, client)
|
||||
override val mediaLoader: MatrixMediaLoader
|
||||
get() = rustMediaLoader
|
||||
|
||||
private var slidingSyncObserverToken: TaskHandle? = null
|
||||
|
||||
private val isSyncing = AtomicBoolean(false)
|
||||
|
||||
private val roomMembershipObserver = RoomMembershipObserver()
|
||||
|
||||
private val roomContentForwarder = RoomContentForwarder(slidingSync)
|
||||
private val roomContentForwarder = RoomContentForwarder(roomListService)
|
||||
|
||||
init {
|
||||
client.setDelegate(clientDelegate)
|
||||
rustRoomSummaryDataSource.init()
|
||||
rustInvitesDataSource.init()
|
||||
slidingSync.setObserver(slidingSyncObserverProxy)
|
||||
slidingSyncUpdateJob = slidingSyncObserverProxy.updateSummaryFlow
|
||||
.onEach { onSlidingSyncUpdate() }
|
||||
.launchIn(coroutineScope)
|
||||
syncService.syncState
|
||||
.onEach { syncState ->
|
||||
if (syncState == SyncState.Syncing) {
|
||||
onSlidingSyncUpdate()
|
||||
}
|
||||
}.launchIn(sessionCoroutineScope)
|
||||
}
|
||||
|
||||
override fun getRoom(roomId: RoomId): MatrixRoom? {
|
||||
val slidingSyncRoom = slidingSync.getRoom(roomId.value) ?: return null
|
||||
val fullRoom = slidingSyncRoom.fullRoom() ?: return null
|
||||
val roomListItem = roomListService.roomOrNull(roomId.value) ?: return null
|
||||
val fullRoom = roomListItem.fullRoom()
|
||||
return RustMatrixRoom(
|
||||
sessionId = sessionId,
|
||||
slidingSyncUpdateFlow = slidingSyncObserverProxy.updateSummaryFlow,
|
||||
slidingSyncRoom = slidingSyncRoom,
|
||||
roomListItem = roomListItem,
|
||||
innerRoom = fullRoom,
|
||||
coroutineScope = coroutineScope,
|
||||
sessionCoroutineScope = sessionCoroutineScope,
|
||||
coroutineDispatchers = dispatchers,
|
||||
clock = clock,
|
||||
systemClock = clock,
|
||||
roomContentForwarder = roomContentForwarder,
|
||||
)
|
||||
}
|
||||
|
|
@ -272,9 +181,11 @@ class RustMatrixClient constructor(
|
|||
|
||||
// Wait to receive the room back from the sync
|
||||
withTimeout(30_000L) {
|
||||
slidingSyncObserverProxy.updateSummaryFlow.filter { roomId.value in it.rooms }.first()
|
||||
roomSummaryDataSource.allRooms()
|
||||
.filter { roomSummaries ->
|
||||
roomSummaries.map { it.identifier() }.contains(roomId.value)
|
||||
}.first()
|
||||
}
|
||||
|
||||
roomId
|
||||
}
|
||||
}
|
||||
|
|
@ -304,37 +215,19 @@ class RustMatrixClient constructor(
|
|||
}
|
||||
}
|
||||
|
||||
override fun syncService(): SyncService = syncService
|
||||
|
||||
override fun sessionVerificationService(): SessionVerificationService = verificationService
|
||||
|
||||
override fun pushersService(): PushersService = pushersService
|
||||
|
||||
override fun notificationService(): NotificationService = notificationService
|
||||
|
||||
override fun startSync() {
|
||||
if (isSyncing.compareAndSet(false, true)) {
|
||||
slidingSyncObserverToken = slidingSync.sync()
|
||||
}
|
||||
}
|
||||
|
||||
override fun stopSync() {
|
||||
if (isSyncing.compareAndSet(true, false)) {
|
||||
slidingSyncObserverToken?.use { it.cancel() }
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
slidingSyncUpdateJob?.cancel()
|
||||
stopSync()
|
||||
slidingSync.setObserver(null)
|
||||
rustRoomSummaryDataSource.close()
|
||||
rustInvitesDataSource.close()
|
||||
sessionCoroutineScope.cancel()
|
||||
client.setDelegate(null)
|
||||
visibleRoomsSlidingSyncListBuilder.destroy()
|
||||
invitesSlidingSyncListBuilder.destroy()
|
||||
visibleRoomsSlidingSyncList.resetReplayCache()
|
||||
invitesSlidingSyncList.resetReplayCache()
|
||||
slidingSync.destroy()
|
||||
verificationService.destroy()
|
||||
roomListService.destroy()
|
||||
client.destroy()
|
||||
}
|
||||
|
||||
|
|
@ -378,7 +271,7 @@ class RustMatrixClient constructor(
|
|||
}
|
||||
}
|
||||
|
||||
override fun onSlidingSyncUpdate() {
|
||||
private fun onSlidingSyncUpdate() {
|
||||
if (!verificationService.isReady.value) {
|
||||
try {
|
||||
verificationService.verificationController = client.getSessionVerificationController()
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ import org.matrix.rustcomponents.sdk.AuthenticationService as RustAuthentication
|
|||
class RustMatrixAuthenticationService @Inject constructor(
|
||||
@ApplicationContext private val context: Context,
|
||||
private val baseDirectory: File,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val appCoroutineScope: CoroutineScope,
|
||||
private val coroutineDispatchers: CoroutineDispatchers,
|
||||
private val sessionStore: SessionStore,
|
||||
private val clock: SystemClock,
|
||||
|
|
@ -180,7 +180,7 @@ class RustMatrixAuthenticationService @Inject constructor(
|
|||
return RustMatrixClient(
|
||||
client = client,
|
||||
sessionStore = sessionStore,
|
||||
coroutineScope = coroutineScope,
|
||||
appCoroutineScope = appCoroutineScope,
|
||||
dispatchers = coroutineDispatchers,
|
||||
baseDirectory = baseDirectory,
|
||||
baseCacheDirectory = context.cacheDir,
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import io.element.android.libraries.matrix.api.room.ForwardEventException
|
|||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.withTimeout
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import org.matrix.rustcomponents.sdk.SlidingSync
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineListener
|
||||
|
|
@ -31,10 +32,10 @@ import kotlin.time.Duration.Companion.milliseconds
|
|||
|
||||
/**
|
||||
* Helper to forward event contents from a room to a set of other rooms.
|
||||
* @param slidingSync the [SlidingSync] to fetch room instances to forward the event to
|
||||
* @param roomListService the [RoomListService] to fetch room instances to forward the event to
|
||||
*/
|
||||
class RoomContentForwarder(
|
||||
private val slidingSync: SlidingSync,
|
||||
private val roomListService: RoomListService,
|
||||
) {
|
||||
|
||||
/**
|
||||
|
|
@ -51,7 +52,7 @@ class RoomContentForwarder(
|
|||
timeoutMs: Long = 5000L
|
||||
) {
|
||||
val content = fromRoom.getTimelineEventContentByEventId(eventId.value)
|
||||
val targetSlidingSyncRooms = toRoomIds.mapNotNull { roomId -> slidingSync.getRoom(roomId.value) }
|
||||
val targetSlidingSyncRooms = toRoomIds.mapNotNull { roomId -> roomListService.roomOrNull(roomId.value) }
|
||||
val targetRooms = targetSlidingSyncRooms.mapNotNull { slidingSyncRoom -> slidingSyncRoom.use { it.fullRoom() } }
|
||||
val failedForwardingTo = mutableSetOf<RoomId>()
|
||||
targetRooms.parallelMap { room ->
|
||||
|
|
@ -79,7 +80,7 @@ class RoomContentForwarder(
|
|||
}
|
||||
}
|
||||
|
||||
private object NoOpTimelineListener: TimelineListener {
|
||||
private object NoOpTimelineListener : TimelineListener {
|
||||
override fun onUpdate(diff: TimelineDiff) = Unit
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.room
|
||||
|
||||
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.buffer
|
||||
import org.matrix.rustcomponents.sdk.RoomList
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesListener
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntry
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
import org.matrix.rustcomponents.sdk.RoomListLoadingState
|
||||
import org.matrix.rustcomponents.sdk.RoomListLoadingStateListener
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import org.matrix.rustcomponents.sdk.RoomListServiceState
|
||||
import org.matrix.rustcomponents.sdk.RoomListServiceStateListener
|
||||
import timber.log.Timber
|
||||
|
||||
fun RoomList.loadingStateFlow(): Flow<RoomListLoadingState> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : RoomListLoadingStateListener {
|
||||
override fun onUpdate(state: RoomListLoadingState) {
|
||||
trySendBlocking(state)
|
||||
}
|
||||
}
|
||||
val result = loadingState(listener)
|
||||
send(result.state)
|
||||
result.stateStream
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
||||
fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit): Flow<RoomListEntriesUpdate> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : RoomListEntriesListener {
|
||||
override fun onUpdate(roomEntriesUpdate: RoomListEntriesUpdate) {
|
||||
trySendBlocking(roomEntriesUpdate)
|
||||
}
|
||||
}
|
||||
val result = entries(listener)
|
||||
onInitialList(result.entries)
|
||||
result.entriesStream
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
||||
fun RoomListService.roomOrNull(roomId: String): RoomListItem? {
|
||||
return try {
|
||||
room(roomId)
|
||||
} catch (failure: Throwable) {
|
||||
Timber.e(failure, "Failed finding room with id=$roomId")
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
fun RoomListService.stateFlow(): Flow<RoomListServiceState> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : RoomListServiceStateListener {
|
||||
override fun onUpdate(state: RoomListServiceState) {
|
||||
trySendBlocking(state)
|
||||
}
|
||||
}
|
||||
state(listener)
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
|
@ -20,27 +20,24 @@ import io.element.android.libraries.matrix.api.core.RoomId
|
|||
import io.element.android.libraries.matrix.api.room.RoomSummaryDetails
|
||||
import io.element.android.libraries.matrix.impl.room.message.RoomMessageFactory
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncRoom
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
|
||||
class RoomSummaryDetailsFactory(private val roomMessageFactory: RoomMessageFactory = RoomMessageFactory()) {
|
||||
|
||||
fun create(slidingSyncRoom: SlidingSyncRoom, room: Room?): RoomSummaryDetails {
|
||||
val latestRoomMessage = slidingSyncRoom.latestRoomMessage()?.use {
|
||||
fun create(roomListItem: RoomListItem, room: Room?): RoomSummaryDetails {
|
||||
val latestRoomMessage = roomListItem.latestEvent()?.use {
|
||||
roomMessageFactory.create(it)
|
||||
}
|
||||
|
||||
return RoomSummaryDetails(
|
||||
roomId = RoomId(slidingSyncRoom.roomId()),
|
||||
name = slidingSyncRoom.name() ?: slidingSyncRoom.roomId(),
|
||||
roomId = RoomId(roomListItem.id()),
|
||||
name = roomListItem.name() ?: roomListItem.id(),
|
||||
canonicalAlias = room?.canonicalAlias(),
|
||||
isDirect = room?.isDirect() ?: false,
|
||||
avatarURLString = room?.avatarUrl(),
|
||||
unreadNotificationCount = slidingSyncRoom.unreadNotifications().use { it.notificationCount().toInt() },
|
||||
unreadNotificationCount = roomListItem.unreadNotifications().use { it.notificationCount().toInt() },
|
||||
lastMessage = latestRoomMessage,
|
||||
lastMessageTimestamp = latestRoomMessage?.originServerTs,
|
||||
inviter = room?.inviter()?.let(RoomMemberMapper::map),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,132 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.room
|
||||
|
||||
import io.element.android.libraries.matrix.api.room.RoomSummary
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntry
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import timber.log.Timber
|
||||
import java.util.UUID
|
||||
|
||||
class RoomSummaryListProcessor(
|
||||
private val roomSummaries: MutableStateFlow<List<RoomSummary>>,
|
||||
private val roomListService: RoomListService,
|
||||
private val roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory(),
|
||||
) {
|
||||
|
||||
private val roomSummariesByIdentifier = HashMap<String, RoomSummary>()
|
||||
private val initLatch = CompletableDeferred<Unit>()
|
||||
private val mutex = Mutex()
|
||||
|
||||
suspend fun postEntries(entries: List<RoomListEntry>) {
|
||||
updateRoomSummaries {
|
||||
Timber.v("Update rooms from postEntries (with ${entries.size} items) on ${Thread.currentThread()}")
|
||||
addAll(entries.map(::buildSummaryForRoomListEntry))
|
||||
}
|
||||
initLatch.complete(Unit)
|
||||
}
|
||||
|
||||
suspend fun postUpdate(update: RoomListEntriesUpdate) {
|
||||
// Makes sure to process first entries before update.
|
||||
initLatch.await()
|
||||
updateRoomSummaries {
|
||||
Timber.v("Update rooms from postUpdate ($update) on ${Thread.currentThread()}")
|
||||
applyUpdate(update)
|
||||
}
|
||||
}
|
||||
|
||||
private fun MutableList<RoomSummary>.applyUpdate(update: RoomListEntriesUpdate) {
|
||||
when (update) {
|
||||
is RoomListEntriesUpdate.Append -> {
|
||||
val roomSummaries = update.values.map {
|
||||
buildSummaryForRoomListEntry(it)
|
||||
}
|
||||
addAll(roomSummaries)
|
||||
}
|
||||
is RoomListEntriesUpdate.PushBack -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(update.value)
|
||||
add(roomSummary)
|
||||
}
|
||||
is RoomListEntriesUpdate.PushFront -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(update.value)
|
||||
add(0, roomSummary)
|
||||
}
|
||||
is RoomListEntriesUpdate.Set -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(update.value)
|
||||
this[update.index.toInt()] = roomSummary
|
||||
}
|
||||
is RoomListEntriesUpdate.Insert -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(update.value)
|
||||
add(update.index.toInt(), roomSummary)
|
||||
}
|
||||
is RoomListEntriesUpdate.Remove -> {
|
||||
removeAt(update.index.toInt())
|
||||
}
|
||||
is RoomListEntriesUpdate.Reset -> {
|
||||
clear()
|
||||
addAll(update.values.map { buildSummaryForRoomListEntry(it) })
|
||||
}
|
||||
RoomListEntriesUpdate.PopBack -> {
|
||||
removeLastOrNull()
|
||||
}
|
||||
RoomListEntriesUpdate.PopFront -> {
|
||||
removeFirstOrNull()
|
||||
}
|
||||
RoomListEntriesUpdate.Clear -> {
|
||||
clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun buildSummaryForRoomListEntry(entry: RoomListEntry): RoomSummary {
|
||||
return when (entry) {
|
||||
RoomListEntry.Empty -> buildEmptyRoomSummary()
|
||||
is RoomListEntry.Filled -> buildAndCacheRoomSummaryForIdentifier(entry.roomId)
|
||||
is RoomListEntry.Invalidated -> {
|
||||
roomSummariesByIdentifier[entry.roomId] ?: buildEmptyRoomSummary()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun buildEmptyRoomSummary(): RoomSummary {
|
||||
return RoomSummary.Empty(UUID.randomUUID().toString())
|
||||
}
|
||||
|
||||
private fun buildAndCacheRoomSummaryForIdentifier(identifier: String): RoomSummary {
|
||||
val builtRoomSummary = roomListService.roomOrNull(identifier)?.use { roomListItem ->
|
||||
roomListItem.fullRoom().use { fullRoom ->
|
||||
RoomSummary.Filled(
|
||||
details = roomSummaryDetailsFactory.create(roomListItem, fullRoom)
|
||||
)
|
||||
}
|
||||
} ?: buildEmptyRoomSummary()
|
||||
roomSummariesByIdentifier[builtRoomSummary.identifier()] = builtRoomSummary
|
||||
return builtRoomSummary
|
||||
}
|
||||
|
||||
private suspend fun updateRoomSummaries(block: MutableList<RoomSummary>.() -> Unit) =
|
||||
mutex.withLock {
|
||||
val mutableRoomSummaries = roomSummaries.value.toMutableList()
|
||||
block(mutableRoomSummaries)
|
||||
roomSummaries.value = mutableRoomSummaries
|
||||
}
|
||||
}
|
||||
|
|
@ -17,6 +17,7 @@
|
|||
package io.element.android.libraries.matrix.impl.room
|
||||
|
||||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
|
||||
import io.element.android.libraries.core.coroutine.childScope
|
||||
import io.element.android.libraries.matrix.api.core.EventId
|
||||
import io.element.android.libraries.matrix.api.core.ProgressCallback
|
||||
import io.element.android.libraries.matrix.api.core.RoomId
|
||||
|
|
@ -32,22 +33,26 @@ import io.element.android.libraries.matrix.api.room.MessageEventType
|
|||
import io.element.android.libraries.matrix.api.room.StateEventType
|
||||
import io.element.android.libraries.matrix.api.room.roomMembers
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.EventType
|
||||
import io.element.android.libraries.matrix.impl.core.toProgressWatcher
|
||||
import io.element.android.libraries.matrix.impl.media.map
|
||||
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
|
||||
import io.element.android.libraries.matrix.impl.timeline.timelineDiffFlow
|
||||
import io.element.android.services.toolbox.api.systemclock.SystemClock
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.RequiredState
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomListItem
|
||||
import org.matrix.rustcomponents.sdk.RoomMember
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncRoom
|
||||
import org.matrix.rustcomponents.sdk.UpdateSummary
|
||||
import org.matrix.rustcomponents.sdk.RoomSubscription
|
||||
import org.matrix.rustcomponents.sdk.genTransactionId
|
||||
import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
|
||||
import timber.log.Timber
|
||||
|
|
@ -55,55 +60,80 @@ import java.io.File
|
|||
|
||||
class RustMatrixRoom(
|
||||
override val sessionId: SessionId,
|
||||
private val slidingSyncUpdateFlow: Flow<UpdateSummary>,
|
||||
private val slidingSyncRoom: SlidingSyncRoom,
|
||||
private val roomListItem: RoomListItem,
|
||||
private val innerRoom: Room,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
sessionCoroutineScope: CoroutineScope,
|
||||
private val coroutineDispatchers: CoroutineDispatchers,
|
||||
private val clock: SystemClock,
|
||||
private val systemClock: SystemClock,
|
||||
private val roomContentForwarder: RoomContentForwarder,
|
||||
) : MatrixRoom {
|
||||
|
||||
override val roomId = RoomId(innerRoom.id())
|
||||
|
||||
private val roomCoroutineScope = sessionCoroutineScope.childScope(coroutineDispatchers.main, "RoomScope-$roomId")
|
||||
|
||||
override val membersStateFlow: StateFlow<MatrixRoomMembersState>
|
||||
get() = _membersStateFlow
|
||||
|
||||
private var _membersStateFlow = MutableStateFlow<MatrixRoomMembersState>(MatrixRoomMembersState.Unknown)
|
||||
private val isInit = MutableStateFlow(false)
|
||||
private val syncUpdateFlow = MutableStateFlow(systemClock.epochMillis())
|
||||
|
||||
private val timeline by lazy {
|
||||
RustMatrixTimeline(
|
||||
matrixRoom = this,
|
||||
innerRoom = innerRoom,
|
||||
slidingSyncRoom = slidingSyncRoom,
|
||||
coroutineScope = coroutineScope,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
coroutineDispatchers = coroutineDispatchers
|
||||
)
|
||||
}
|
||||
|
||||
override fun syncUpdateFlow(): Flow<Long> {
|
||||
return slidingSyncUpdateFlow
|
||||
.filter {
|
||||
it.rooms.contains(roomId.value)
|
||||
}
|
||||
.map {
|
||||
clock.epochMillis()
|
||||
}
|
||||
.onStart { emit(clock.epochMillis()) }
|
||||
return syncUpdateFlow
|
||||
}
|
||||
|
||||
override fun timeline(): MatrixTimeline {
|
||||
return timeline
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
innerRoom.destroy()
|
||||
slidingSyncRoom.destroy()
|
||||
override fun open(): Result<Unit> {
|
||||
if (isInit.value) return Result.failure(IllegalStateException("Listener already registered"))
|
||||
val settings = RoomSubscription(
|
||||
requiredState = listOf(
|
||||
RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_TOPIC, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_POWER_LEVELS, value = ""),
|
||||
),
|
||||
timelineLimit = null
|
||||
)
|
||||
roomListItem.subscribe(settings)
|
||||
roomCoroutineScope.launch(coroutineDispatchers.computation) {
|
||||
innerRoom.timelineDiffFlow { initialList ->
|
||||
timeline.postItems(initialList)
|
||||
}.onEach {
|
||||
syncUpdateFlow.value = systemClock.epochMillis()
|
||||
timeline.postDiff(it)
|
||||
}.launchIn(this)
|
||||
fetchMembers()
|
||||
}
|
||||
isInit.value = true
|
||||
return Result.success(Unit)
|
||||
}
|
||||
|
||||
override val roomId = RoomId(innerRoom.id())
|
||||
override fun close() {
|
||||
if (isInit.value) {
|
||||
isInit.value = false
|
||||
roomCoroutineScope.cancel()
|
||||
roomListItem.unsubscribe()
|
||||
innerRoom.destroy()
|
||||
roomListItem.destroy()
|
||||
}
|
||||
}
|
||||
|
||||
override val name: String?
|
||||
get() {
|
||||
return slidingSyncRoom.name()
|
||||
return roomListItem.name()
|
||||
}
|
||||
|
||||
override val bestName: String
|
||||
|
|
@ -330,6 +360,12 @@ class RustMatrixRoom(
|
|||
}
|
||||
}
|
||||
|
||||
private suspend fun fetchMembers() = withContext(coroutineDispatchers.io) {
|
||||
runCatching {
|
||||
innerRoom.fetchMembers()
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun reportContent(eventId: EventId, reason: String, blockUserId: UserId?): Result<Unit> = withContext(coroutineDispatchers.io) {
|
||||
runCatching {
|
||||
innerRoom.reportContent(eventId = eventId.value, score = null, reason = reason)
|
||||
|
|
|
|||
|
|
@ -19,186 +19,104 @@ package io.element.android.libraries.matrix.impl.room
|
|||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
|
||||
import io.element.android.libraries.matrix.api.room.RoomSummary
|
||||
import io.element.android.libraries.matrix.api.room.RoomSummaryDataSource
|
||||
import io.element.android.libraries.matrix.impl.sync.roomListDiff
|
||||
import io.element.android.libraries.matrix.impl.sync.state
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.firstOrNull
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntry
|
||||
import org.matrix.rustcomponents.sdk.SlidingSync
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncList
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListRoomsListDiff
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncSelectiveModeBuilder
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListLoadingState
|
||||
import org.matrix.rustcomponents.sdk.UpdateSummary
|
||||
import org.matrix.rustcomponents.sdk.RoomList
|
||||
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
|
||||
import org.matrix.rustcomponents.sdk.RoomListException
|
||||
import org.matrix.rustcomponents.sdk.RoomListInput
|
||||
import org.matrix.rustcomponents.sdk.RoomListLoadingState
|
||||
import org.matrix.rustcomponents.sdk.RoomListRange
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import org.matrix.rustcomponents.sdk.RoomListServiceState
|
||||
import timber.log.Timber
|
||||
import java.io.Closeable
|
||||
import java.util.UUID
|
||||
|
||||
internal class RustRoomSummaryDataSource(
|
||||
private val slidingSyncUpdateFlow: Flow<UpdateSummary>,
|
||||
private val slidingSync: SlidingSync,
|
||||
private val slidingSyncListFlow: Flow<SlidingSyncList>,
|
||||
private val coroutineDispatchers: CoroutineDispatchers,
|
||||
private val roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory(),
|
||||
) : RoomSummaryDataSource, Closeable {
|
||||
private val roomListService: RoomListService,
|
||||
private val sessionCoroutineScope: CoroutineScope,
|
||||
coroutineDispatchers: CoroutineDispatchers,
|
||||
roomSummaryDetailsFactory: RoomSummaryDetailsFactory = RoomSummaryDetailsFactory(),
|
||||
) : RoomSummaryDataSource {
|
||||
|
||||
private val coroutineScope = CoroutineScope(SupervisorJob() + coroutineDispatchers.io)
|
||||
private val allRooms = MutableStateFlow<List<RoomSummary>>(emptyList())
|
||||
private val inviteRooms = MutableStateFlow<List<RoomSummary>>(emptyList())
|
||||
|
||||
private val roomSummaries = MutableStateFlow<List<RoomSummary>>(emptyList())
|
||||
private val state = MutableStateFlow(SlidingSyncListLoadingState.NOT_LOADED)
|
||||
private val allRoomsLoadingState: MutableStateFlow<RoomSummaryDataSource.LoadingState> = MutableStateFlow(RoomSummaryDataSource.LoadingState.NotLoaded)
|
||||
private val allRoomsListProcessor = RoomSummaryListProcessor(allRooms, roomListService, roomSummaryDetailsFactory)
|
||||
private val inviteRoomsListProcessor = RoomSummaryListProcessor(inviteRooms, roomListService, roomSummaryDetailsFactory)
|
||||
|
||||
fun init() {
|
||||
coroutineScope.launch {
|
||||
val slidingSyncList = slidingSyncListFlow.first()
|
||||
val summaries = slidingSyncList.currentRoomList().map(::buildSummaryForRoomListEntry)
|
||||
updateRoomSummaries {
|
||||
addAll(summaries)
|
||||
}
|
||||
|
||||
slidingSyncList.roomListDiff(this)
|
||||
.onEach { diffs ->
|
||||
updateRoomSummaries {
|
||||
applyDiff(diffs)
|
||||
}
|
||||
}
|
||||
init {
|
||||
sessionCoroutineScope.launch(coroutineDispatchers.computation) {
|
||||
val allRooms = roomListService.allRooms()
|
||||
allRooms
|
||||
.observeEntriesWithProcessor(allRoomsListProcessor)
|
||||
.launchIn(this)
|
||||
|
||||
slidingSyncList.state(this)
|
||||
.onEach { SlidingSyncListLoadingState ->
|
||||
Timber.v("New sliding sync state: $SlidingSyncListLoadingState")
|
||||
state.value = SlidingSyncListLoadingState
|
||||
allRooms
|
||||
.loadingStateFlow()
|
||||
.map { it.toRoomSummaryDataSourceLoadingState() }
|
||||
.onEach {
|
||||
allRoomsLoadingState.value = it
|
||||
}.launchIn(this)
|
||||
|
||||
launch {
|
||||
// Wait until running, as invites is only available after that
|
||||
roomListService.stateFlow().first {
|
||||
it == RoomListServiceState.RUNNING
|
||||
}
|
||||
roomListService.invites()
|
||||
.observeEntriesWithProcessor(inviteRoomsListProcessor)
|
||||
.launchIn(this)
|
||||
}
|
||||
}
|
||||
|
||||
slidingSyncUpdateFlow
|
||||
.onEach {
|
||||
didReceiveSyncUpdate(it)
|
||||
}.launchIn(coroutineScope)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
runBlocking { slidingSyncListFlow.firstOrNull() }?.close()
|
||||
coroutineScope.cancel()
|
||||
override fun allRooms(): StateFlow<List<RoomSummary>> {
|
||||
return allRooms
|
||||
}
|
||||
|
||||
override fun roomSummaries(): StateFlow<List<RoomSummary>> {
|
||||
return roomSummaries
|
||||
override fun inviteRooms(): StateFlow<List<RoomSummary>> {
|
||||
return inviteRooms
|
||||
}
|
||||
|
||||
override fun setSlidingSyncRange(range: IntRange) {
|
||||
override fun allRoomsLoadingState(): StateFlow<RoomSummaryDataSource.LoadingState> {
|
||||
return allRoomsLoadingState
|
||||
}
|
||||
|
||||
override fun updateRoomListVisibleRange(range: IntRange) {
|
||||
Timber.v("setVisibleRange=$range")
|
||||
coroutineScope.launch {
|
||||
val slidingSyncMode = SlidingSyncSelectiveModeBuilder()
|
||||
.addRange(range.first.toUInt(), range.last.toUInt())
|
||||
slidingSyncListFlow.first().setSyncMode(slidingSyncMode)
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun didReceiveSyncUpdate(summary: UpdateSummary) {
|
||||
Timber.v("UpdateRooms with identifiers: ${summary.rooms}")
|
||||
if (state.value != SlidingSyncListLoadingState.FULLY_LOADED) {
|
||||
return
|
||||
}
|
||||
updateRoomSummaries {
|
||||
for (identifier in summary.rooms) {
|
||||
val index = indexOfFirst { it.identifier() == identifier }
|
||||
if (index == -1) {
|
||||
continue
|
||||
}
|
||||
val updatedRoomSummary = buildRoomSummaryForIdentifier(identifier)
|
||||
set(index, updatedRoomSummary)
|
||||
sessionCoroutineScope.launch {
|
||||
try {
|
||||
val ranges = listOf(RoomListRange(range.first.toUInt(), range.last.toUInt()))
|
||||
roomListService.applyInput(
|
||||
RoomListInput.Viewport(ranges)
|
||||
)
|
||||
} catch (exception: RoomListException) {
|
||||
Timber.e(exception, "Failed updating visible range")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun MutableList<RoomSummary>.applyDiff(diff: SlidingSyncListRoomsListDiff) {
|
||||
fun MutableList<RoomSummary>.fillUntil(untilIndex: Int) {
|
||||
repeat((size - 1 until untilIndex).count()) {
|
||||
add(buildEmptyRoomSummary())
|
||||
}
|
||||
}
|
||||
Timber.v("ApplyDiff: $diff for list with size: $size")
|
||||
when (diff) {
|
||||
is SlidingSyncListRoomsListDiff.Append -> {
|
||||
val roomSummaries = diff.values.map {
|
||||
buildSummaryForRoomListEntry(it)
|
||||
}
|
||||
addAll(roomSummaries)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.PushBack -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(diff.value)
|
||||
add(roomSummary)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.PushFront -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(diff.value)
|
||||
add(0, roomSummary)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.Set -> {
|
||||
fillUntil(diff.index.toInt())
|
||||
val roomSummary = buildSummaryForRoomListEntry(diff.value)
|
||||
set(diff.index.toInt(), roomSummary)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.Insert -> {
|
||||
val roomSummary = buildSummaryForRoomListEntry(diff.value)
|
||||
add(diff.index.toInt(), roomSummary)
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.Remove -> {
|
||||
removeAt(diff.index.toInt())
|
||||
}
|
||||
is SlidingSyncListRoomsListDiff.Reset -> {
|
||||
clear()
|
||||
addAll(diff.values.map { buildSummaryForRoomListEntry(it) })
|
||||
}
|
||||
SlidingSyncListRoomsListDiff.PopBack -> {
|
||||
removeFirstOrNull()
|
||||
}
|
||||
SlidingSyncListRoomsListDiff.PopFront -> {
|
||||
removeLastOrNull()
|
||||
}
|
||||
SlidingSyncListRoomsListDiff.Clear -> {
|
||||
clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun buildSummaryForRoomListEntry(entry: RoomListEntry): RoomSummary {
|
||||
return when (entry) {
|
||||
RoomListEntry.Empty -> buildEmptyRoomSummary()
|
||||
is RoomListEntry.Invalidated -> buildRoomSummaryForIdentifier(entry.roomId)
|
||||
is RoomListEntry.Filled -> buildRoomSummaryForIdentifier(entry.roomId)
|
||||
}
|
||||
}
|
||||
|
||||
private fun buildEmptyRoomSummary(): RoomSummary {
|
||||
return RoomSummary.Empty(UUID.randomUUID().toString())
|
||||
}
|
||||
|
||||
private fun buildRoomSummaryForIdentifier(identifier: String): RoomSummary {
|
||||
val slidingSyncRoom = slidingSync.getRoom(identifier) ?: return RoomSummary.Empty(identifier)
|
||||
val fullRoom = slidingSyncRoom.fullRoom()
|
||||
val roomSummary = RoomSummary.Filled(
|
||||
details = roomSummaryDetailsFactory.create(slidingSyncRoom, fullRoom)
|
||||
)
|
||||
fullRoom?.destroy()
|
||||
slidingSyncRoom.destroy()
|
||||
return roomSummary
|
||||
}
|
||||
|
||||
private suspend fun updateRoomSummaries(block: MutableList<RoomSummary>.() -> Unit) =
|
||||
withContext(coroutineDispatchers.diffUpdateDispatcher) {
|
||||
val mutableRoomSummaries = roomSummaries.value.toMutableList()
|
||||
block(mutableRoomSummaries)
|
||||
roomSummaries.value = mutableRoomSummaries
|
||||
}
|
||||
}
|
||||
|
||||
private fun RoomListLoadingState.toRoomSummaryDataSourceLoadingState(): RoomSummaryDataSource.LoadingState {
|
||||
return when (this) {
|
||||
is RoomListLoadingState.Loaded -> RoomSummaryDataSource.LoadingState.Loaded(maximumNumberOfRooms?.toInt() ?: 0)
|
||||
is RoomListLoadingState.NotLoaded -> RoomSummaryDataSource.LoadingState.NotLoaded
|
||||
}
|
||||
}
|
||||
|
||||
private fun RoomList.observeEntriesWithProcessor(processor: RoomSummaryListProcessor): Flow<RoomListEntriesUpdate> {
|
||||
return entriesFlow { roomListEntries ->
|
||||
processor.postEntries(roomListEntries)
|
||||
}.onEach { update ->
|
||||
processor.postUpdate(update)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.sync
|
||||
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import org.matrix.rustcomponents.sdk.RoomListServiceState
|
||||
|
||||
internal fun RoomListServiceState.toSyncState(): SyncState {
|
||||
return when (this) {
|
||||
RoomListServiceState.INIT,
|
||||
RoomListServiceState.SETTING_UP -> SyncState.Idle
|
||||
RoomListServiceState.RUNNING -> SyncState.Syncing
|
||||
RoomListServiceState.ERROR -> SyncState.InError
|
||||
RoomListServiceState.TERMINATED -> SyncState.Terminated
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.sync
|
||||
|
||||
import io.element.android.libraries.matrix.api.sync.SyncService
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import io.element.android.libraries.matrix.impl.room.stateFlow
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import org.matrix.rustcomponents.sdk.RoomListService
|
||||
import org.matrix.rustcomponents.sdk.RoomListServiceState
|
||||
|
||||
class RustSyncService(
|
||||
private val roomListService: RoomListService,
|
||||
sessionCoroutineScope: CoroutineScope
|
||||
) : SyncService {
|
||||
|
||||
override fun startSync() = runCatching {
|
||||
if (!roomListService.isSyncing()) {
|
||||
roomListService.sync()
|
||||
}
|
||||
}
|
||||
|
||||
override fun stopSync() = runCatching {
|
||||
if (roomListService.isSyncing()) {
|
||||
roomListService.stopSync()
|
||||
}
|
||||
}
|
||||
|
||||
override val syncState: StateFlow<SyncState> =
|
||||
roomListService
|
||||
.stateFlow()
|
||||
.map(RoomListServiceState::toSyncState)
|
||||
.distinctUntilChanged()
|
||||
.stateIn(sessionCoroutineScope, SharingStarted.WhileSubscribed(), SyncState.Idle)
|
||||
}
|
||||
|
|
@ -1,62 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.sync
|
||||
|
||||
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.launch
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncList
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListLoadingState
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListRoomListObserver
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListRoomsCountObserver
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListRoomsListDiff
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncListStateObserver
|
||||
|
||||
fun SlidingSyncList.roomListDiff(scope: CoroutineScope): Flow<SlidingSyncListRoomsListDiff> =
|
||||
mxCallbackFlow {
|
||||
val observer = object : SlidingSyncListRoomListObserver {
|
||||
override fun didReceiveUpdate(diff: SlidingSyncListRoomsListDiff) {
|
||||
scope.launch {
|
||||
send(diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
observeRoomList(observer)
|
||||
}
|
||||
|
||||
fun SlidingSyncList.state(scope: CoroutineScope): Flow<SlidingSyncListLoadingState> = mxCallbackFlow {
|
||||
val observer = object : SlidingSyncListStateObserver {
|
||||
override fun didReceiveUpdate(newState: SlidingSyncListLoadingState) {
|
||||
scope.launch {
|
||||
send(newState)
|
||||
}
|
||||
}
|
||||
}
|
||||
observeState(observer)
|
||||
}
|
||||
|
||||
fun SlidingSyncList.roomsCount(scope: CoroutineScope): Flow<UInt> = mxCallbackFlow {
|
||||
val observer = object : SlidingSyncListRoomsCountObserver {
|
||||
override fun didReceiveUpdate(count: UInt) {
|
||||
scope.launch {
|
||||
send(count)
|
||||
}
|
||||
}
|
||||
}
|
||||
observeRoomsCount(observer)
|
||||
}
|
||||
|
|
@ -1,43 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2022 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.sync
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.asSharedFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncObserver
|
||||
import org.matrix.rustcomponents.sdk.UpdateSummary
|
||||
|
||||
// Sounds like a reasonable buffer size before it suspends emitting new items.
|
||||
private const val BUFFER_SIZE = 64
|
||||
|
||||
class SlidingSyncObserverProxy(
|
||||
private val coroutineScope: CoroutineScope,
|
||||
) : SlidingSyncObserver {
|
||||
|
||||
private val updateSummaryMutableFlow =
|
||||
MutableSharedFlow<UpdateSummary>(extraBufferCapacity = BUFFER_SIZE)
|
||||
val updateSummaryFlow: SharedFlow<UpdateSummary> = updateSummaryMutableFlow.asSharedFlow()
|
||||
|
||||
override fun didReceiveSyncUpdate(summary: UpdateSummary) {
|
||||
coroutineScope.launch {
|
||||
updateSummaryMutableFlow.emit(summary)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -19,33 +19,38 @@ package io.element.android.libraries.matrix.impl.timeline
|
|||
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
|
||||
import io.element.android.libraries.matrix.api.timeline.item.virtual.VirtualTimelineItem
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import org.matrix.rustcomponents.sdk.TimelineChange
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import org.matrix.rustcomponents.sdk.TimelineListener
|
||||
|
||||
internal class MatrixTimelineDiffProcessor(
|
||||
private val paginationState: MutableStateFlow<MatrixTimeline.PaginationState>,
|
||||
private val timelineItems: MutableStateFlow<List<MatrixTimelineItem>>,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val diffDispatcher: CoroutineDispatcher,
|
||||
private val timelineItemFactory: MatrixTimelineItemMapper,
|
||||
) : TimelineListener {
|
||||
) {
|
||||
|
||||
override fun onUpdate(update: TimelineDiff) {
|
||||
coroutineScope.launch {
|
||||
updateTimelineItems {
|
||||
applyDiff(update)
|
||||
}
|
||||
when (val firstItem = timelineItems.value.firstOrNull()) {
|
||||
is MatrixTimelineItem.Virtual -> updateBackPaginationState(firstItem.virtual)
|
||||
else -> updateBackPaginationState(null)
|
||||
}
|
||||
private val initLatch = CompletableDeferred<Unit>()
|
||||
private val mutex = Mutex()
|
||||
|
||||
suspend fun postItems(items: List<TimelineItem>) {
|
||||
updateTimelineItems {
|
||||
val mappedItems = items.map { it.asMatrixTimelineItem() }
|
||||
addAll(mappedItems)
|
||||
updateBackPaginationState()
|
||||
}
|
||||
initLatch.complete(Unit)
|
||||
}
|
||||
|
||||
suspend fun postDiff(diff: TimelineDiff) {
|
||||
// Makes sure to process first items before diff.
|
||||
initLatch.await()
|
||||
updateTimelineItems {
|
||||
applyDiff(diff)
|
||||
updateBackPaginationState()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -69,7 +74,7 @@ internal class MatrixTimelineDiffProcessor(
|
|||
}
|
||||
|
||||
private suspend fun updateTimelineItems(block: MutableList<MatrixTimelineItem>.() -> Unit) =
|
||||
withContext(diffDispatcher) {
|
||||
mutex.withLock {
|
||||
val mutableTimelineItems = timelineItems.value.toMutableList()
|
||||
block(mutableTimelineItems)
|
||||
timelineItems.value = mutableTimelineItems
|
||||
|
|
@ -120,8 +125,14 @@ internal class MatrixTimelineDiffProcessor(
|
|||
}
|
||||
}
|
||||
|
||||
private fun List<MatrixTimelineItem>.updateBackPaginationState() {
|
||||
when (val firstItem = firstOrNull()) {
|
||||
is MatrixTimelineItem.Virtual -> updateBackPaginationState(firstItem.virtual)
|
||||
else -> updateBackPaginationState(null)
|
||||
}
|
||||
}
|
||||
|
||||
private fun TimelineItem.asMatrixTimelineItem(): MatrixTimelineItem {
|
||||
return timelineItemFactory.map(this)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import org.matrix.rustcomponents.sdk.TimelineItem
|
|||
|
||||
class MatrixTimelineItemMapper(
|
||||
private val fetchDetailsForEvent: suspend (EventId) -> Result<Unit>,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val roomCoroutineScope: CoroutineScope,
|
||||
private val virtualTimelineItemMapper: VirtualTimelineItemMapper = VirtualTimelineItemMapper(),
|
||||
private val eventTimelineItemMapper: EventTimelineItemMapper = EventTimelineItemMapper(),
|
||||
) {
|
||||
|
|
@ -51,7 +51,7 @@ class MatrixTimelineItemMapper(
|
|||
return MatrixTimelineItem.Other
|
||||
}
|
||||
|
||||
private fun fetchEventDetails(eventId: EventId) = coroutineScope.launch {
|
||||
private fun fetchEventDetails(eventId: EventId) = roomCoroutineScope.launch {
|
||||
fetchDetailsForEvent(eventId)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,40 +21,30 @@ import io.element.android.libraries.matrix.api.core.EventId
|
|||
import io.element.android.libraries.matrix.api.room.MatrixRoom
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
|
||||
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.EventType
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessageMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper
|
||||
import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper
|
||||
import io.element.android.libraries.matrix.impl.util.TaskHandleBag
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.sample
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.rustcomponents.sdk.PaginationOptions
|
||||
import org.matrix.rustcomponents.sdk.RequiredState
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.RoomSubscription
|
||||
import org.matrix.rustcomponents.sdk.SlidingSyncRoom
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import org.matrix.rustcomponents.sdk.TimelineListener
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
class RustMatrixTimeline(
|
||||
roomCoroutineScope: CoroutineScope,
|
||||
private val matrixRoom: MatrixRoom,
|
||||
private val innerRoom: Room,
|
||||
private val slidingSyncRoom: SlidingSyncRoom,
|
||||
private val coroutineScope: CoroutineScope,
|
||||
private val coroutineDispatchers: CoroutineDispatchers,
|
||||
) : MatrixTimeline {
|
||||
|
||||
private val isInit = AtomicBoolean(false)
|
||||
|
||||
private val timelineItems: MutableStateFlow<List<MatrixTimelineItem>> =
|
||||
MutableStateFlow(emptyList())
|
||||
|
||||
|
|
@ -64,7 +54,7 @@ class RustMatrixTimeline(
|
|||
|
||||
private val timelineItemFactory = MatrixTimelineItemMapper(
|
||||
fetchDetailsForEvent = this::fetchDetailsForEvent,
|
||||
coroutineScope = coroutineScope,
|
||||
roomCoroutineScope = roomCoroutineScope,
|
||||
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
|
||||
eventTimelineItemMapper = EventTimelineItemMapper(
|
||||
contentMapper = TimelineEventContentMapper(
|
||||
|
|
@ -73,15 +63,12 @@ class RustMatrixTimeline(
|
|||
)
|
||||
)
|
||||
|
||||
private val innerTimelineListener = MatrixTimelineDiffProcessor(
|
||||
private val timelineDiffProcessor = MatrixTimelineDiffProcessor(
|
||||
paginationState = paginationState,
|
||||
timelineItems = timelineItems,
|
||||
coroutineScope = coroutineScope,
|
||||
diffDispatcher = coroutineDispatchers.diffUpdateDispatcher,
|
||||
timelineItemFactory = timelineItemFactory,
|
||||
)
|
||||
|
||||
private val listenerTokens = TaskHandleBag()
|
||||
override fun paginationState(): StateFlow<MatrixTimeline.PaginationState> {
|
||||
return paginationState
|
||||
}
|
||||
|
|
@ -91,43 +78,12 @@ class RustMatrixTimeline(
|
|||
return timelineItems.sample(50)
|
||||
}
|
||||
|
||||
override fun initialize() {
|
||||
Timber.v("Init timeline for room ${matrixRoom.roomId}")
|
||||
coroutineScope.launch {
|
||||
val result = addListener(innerTimelineListener)
|
||||
result
|
||||
.onSuccess { timelineItems ->
|
||||
val matrixTimelineItems = timelineItems.map(timelineItemFactory::map)
|
||||
withContext(coroutineDispatchers.diffUpdateDispatcher) {
|
||||
this@RustMatrixTimeline.timelineItems.value = matrixTimelineItems
|
||||
}
|
||||
isInit.set(true)
|
||||
}
|
||||
.onFailure {
|
||||
Timber.e("Failed adding timeline listener on room with identifier: ${matrixRoom.roomId})")
|
||||
}
|
||||
}
|
||||
internal suspend fun postItems(items: List<TimelineItem>) {
|
||||
timelineDiffProcessor.postItems(items)
|
||||
}
|
||||
|
||||
override fun dispose() {
|
||||
Timber.v("Dispose timeline for room ${matrixRoom.roomId}")
|
||||
listenerTokens.dispose()
|
||||
isInit.set(false)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param message markdown message
|
||||
*/
|
||||
override suspend fun sendMessage(message: String): Result<Unit> {
|
||||
return matrixRoom.sendMessage(message)
|
||||
}
|
||||
|
||||
override suspend fun editMessage(originalEventId: EventId, message: String): Result<Unit> {
|
||||
return matrixRoom.editMessage(originalEventId, message = message)
|
||||
}
|
||||
|
||||
override suspend fun replyMessage(inReplyToEventId: EventId, message: String): Result<Unit> {
|
||||
return matrixRoom.replyMessage(inReplyToEventId, message)
|
||||
internal suspend fun postDiff(timelineDiff: TimelineDiff) {
|
||||
timelineDiffProcessor.postDiff(timelineDiff)
|
||||
}
|
||||
|
||||
override suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> = withContext(coroutineDispatchers.io) {
|
||||
|
|
@ -139,9 +95,6 @@ class RustMatrixTimeline(
|
|||
override suspend fun paginateBackwards(requestSize: Int, untilNumberOfItems: Int): Result<Unit> = withContext(coroutineDispatchers.io) {
|
||||
runCatching {
|
||||
Timber.v("Start back paginating for room ${matrixRoom.roomId} ")
|
||||
if (!isInit.get()) {
|
||||
throw IllegalStateException("Timeline is not init yet")
|
||||
}
|
||||
val paginationOptions = PaginationOptions.UntilNumItems(
|
||||
eventLimit = requestSize.toUShort(),
|
||||
items = untilNumberOfItems.toUShort(),
|
||||
|
|
@ -154,31 +107,4 @@ class RustMatrixTimeline(
|
|||
Timber.v("Success back paginating for room ${matrixRoom.roomId}")
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun addListener(timelineListener: TimelineListener): Result<List<TimelineItem>> = withContext(coroutineDispatchers.io) {
|
||||
runCatching {
|
||||
val settings = RoomSubscription(
|
||||
requiredState = listOf(
|
||||
RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_TOPIC, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""),
|
||||
RequiredState(key = EventType.STATE_ROOM_POWER_LEVELS, value = ""),
|
||||
),
|
||||
timelineLimit = null
|
||||
)
|
||||
slidingSyncRoom.subscribeToRoom(settings)
|
||||
val result = slidingSyncRoom.addTimelineListener(timelineListener)
|
||||
launch {
|
||||
fetchMembers()
|
||||
}
|
||||
listenerTokens += result.taskHandle
|
||||
result.items
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun fetchMembers() = withContext(coroutineDispatchers.io) {
|
||||
runCatching {
|
||||
innerRoom.fetchMembers()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.timeline
|
||||
|
||||
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.buffer
|
||||
import org.matrix.rustcomponents.sdk.Room
|
||||
import org.matrix.rustcomponents.sdk.TimelineDiff
|
||||
import org.matrix.rustcomponents.sdk.TimelineItem
|
||||
import org.matrix.rustcomponents.sdk.TimelineListener
|
||||
|
||||
internal fun Room.timelineDiffFlow(onInitialList: suspend (List<TimelineItem>) -> Unit): Flow<TimelineDiff> =
|
||||
mxCallbackFlow {
|
||||
val listener = object : TimelineListener {
|
||||
override fun onUpdate(diff: TimelineDiff) {
|
||||
trySendBlocking(diff)
|
||||
}
|
||||
}
|
||||
val result = addTimelineListener(listener)
|
||||
onInitialList(result.items)
|
||||
result.itemsStream
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
|
|
@ -36,6 +36,7 @@ import io.element.android.libraries.matrix.test.notification.FakeNotificationSer
|
|||
import io.element.android.libraries.matrix.test.pushers.FakePushersService
|
||||
import io.element.android.libraries.matrix.test.room.FakeMatrixRoom
|
||||
import io.element.android.libraries.matrix.test.room.FakeRoomSummaryDataSource
|
||||
import io.element.android.libraries.matrix.test.sync.FakeSyncService
|
||||
import io.element.android.libraries.matrix.test.verification.FakeSessionVerificationService
|
||||
import kotlinx.coroutines.delay
|
||||
|
||||
|
|
@ -44,11 +45,11 @@ class FakeMatrixClient(
|
|||
private val userDisplayName: Result<String> = Result.success(A_USER_NAME),
|
||||
private val userAvatarURLString: Result<String> = Result.success(AN_AVATAR_URL),
|
||||
override val roomSummaryDataSource: RoomSummaryDataSource = FakeRoomSummaryDataSource(),
|
||||
override val invitesDataSource: RoomSummaryDataSource = FakeRoomSummaryDataSource(),
|
||||
override val mediaLoader: MatrixMediaLoader = FakeMediaLoader(),
|
||||
private val sessionVerificationService: FakeSessionVerificationService = FakeSessionVerificationService(),
|
||||
private val pushersService: FakePushersService = FakePushersService(),
|
||||
private val notificationService: FakeNotificationService = FakeNotificationService(),
|
||||
private val syncService: FakeSyncService = FakeSyncService(),
|
||||
) : MatrixClient {
|
||||
|
||||
private var ignoreUserResult: Result<Unit> = Result.success(Unit)
|
||||
|
|
@ -98,9 +99,7 @@ class FakeMatrixClient(
|
|||
return searchUserResults[searchTerm] ?: Result.failure(IllegalStateException("No response defined for $searchTerm"))
|
||||
}
|
||||
|
||||
override fun startSync() = Unit
|
||||
|
||||
override fun stopSync() = Unit
|
||||
override fun syncService() = syncService
|
||||
|
||||
override suspend fun getCacheSize(): Long {
|
||||
return 0
|
||||
|
|
@ -138,8 +137,6 @@ class FakeMatrixClient(
|
|||
|
||||
override fun notificationService(): NotificationService = notificationService
|
||||
|
||||
override fun onSlidingSyncUpdate() {}
|
||||
|
||||
override fun roomMembershipObserver(): RoomMembershipObserver {
|
||||
return RoomMembershipObserver()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -134,6 +134,10 @@ class FakeMatrixRoom(
|
|||
return matrixTimeline
|
||||
}
|
||||
|
||||
override fun open(): Result<Unit> {
|
||||
return Result.success(Unit)
|
||||
}
|
||||
|
||||
override suspend fun userDisplayName(userId: UserId): Result<String?> = simulateLongTask {
|
||||
userDisplayNameResult
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,20 +23,38 @@ import kotlinx.coroutines.flow.StateFlow
|
|||
|
||||
class FakeRoomSummaryDataSource : RoomSummaryDataSource {
|
||||
|
||||
private val roomSummariesFlow = MutableStateFlow<List<RoomSummary>>(emptyList())
|
||||
private val allRoomSummariesFlow = MutableStateFlow<List<RoomSummary>>(emptyList())
|
||||
private val inviteRoomSummariesFlow = MutableStateFlow<List<RoomSummary>>(emptyList())
|
||||
private val allRoomsLoadingStateFlow = MutableStateFlow<RoomSummaryDataSource.LoadingState>(RoomSummaryDataSource.LoadingState.NotLoaded)
|
||||
|
||||
suspend fun postRoomSummary(roomSummaries: List<RoomSummary>) {
|
||||
roomSummariesFlow.emit(roomSummaries)
|
||||
suspend fun postAllRooms(roomSummaries: List<RoomSummary>) {
|
||||
allRoomSummariesFlow.emit(roomSummaries)
|
||||
}
|
||||
|
||||
override fun roomSummaries(): StateFlow<List<RoomSummary>> {
|
||||
return roomSummariesFlow
|
||||
suspend fun postInviteRooms(roomSummaries: List<RoomSummary>) {
|
||||
inviteRoomSummariesFlow.emit(roomSummaries)
|
||||
}
|
||||
|
||||
suspend fun postLoadingState(loadingState: RoomSummaryDataSource.LoadingState) {
|
||||
allRoomsLoadingStateFlow.emit(loadingState)
|
||||
}
|
||||
|
||||
override fun allRoomsLoadingState(): StateFlow<RoomSummaryDataSource.LoadingState> {
|
||||
return allRoomsLoadingStateFlow
|
||||
}
|
||||
|
||||
override fun allRooms(): StateFlow<List<RoomSummary>> {
|
||||
return allRoomSummariesFlow
|
||||
}
|
||||
|
||||
override fun inviteRooms(): StateFlow<List<RoomSummary>> {
|
||||
return inviteRoomSummariesFlow
|
||||
}
|
||||
|
||||
var latestSlidingSyncRange: IntRange? = null
|
||||
private set
|
||||
|
||||
override fun setSlidingSyncRange(range: IntRange) {
|
||||
override fun updateRoomListVisibleRange(range: IntRange) {
|
||||
latestSlidingSyncRange = range
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright (c) 2023 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.test.sync
|
||||
|
||||
import io.element.android.libraries.matrix.api.sync.SyncService
|
||||
import io.element.android.libraries.matrix.api.sync.SyncState
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
|
||||
class FakeSyncService : SyncService {
|
||||
|
||||
private val syncStateFlow = MutableStateFlow(SyncState.Idle)
|
||||
|
||||
fun simulateError() {
|
||||
syncStateFlow.value = SyncState.InError
|
||||
}
|
||||
|
||||
override fun startSync(): Result<Unit> {
|
||||
syncStateFlow.value = SyncState.Syncing
|
||||
return Result.success(Unit)
|
||||
}
|
||||
|
||||
override fun stopSync(): Result<Unit> {
|
||||
syncStateFlow.value = SyncState.Terminated
|
||||
return Result.success(Unit)
|
||||
}
|
||||
|
||||
override val syncState: StateFlow<SyncState> = syncStateFlow
|
||||
}
|
||||
|
|
@ -31,7 +31,6 @@ class FakeMatrixTimeline(
|
|||
|
||||
private val paginationState: MutableStateFlow<MatrixTimeline.PaginationState> = MutableStateFlow(initialPaginationState)
|
||||
private val timelineItems: MutableStateFlow<List<MatrixTimelineItem>> = MutableStateFlow(initialTimelineItems)
|
||||
var isInitialized = false
|
||||
|
||||
fun updatePaginationState(update: (MatrixTimeline.PaginationState.() -> MatrixTimeline.PaginationState)) {
|
||||
paginationState.value = update(paginationState.value)
|
||||
|
|
@ -63,25 +62,6 @@ class FakeMatrixTimeline(
|
|||
return Result.success(Unit)
|
||||
}
|
||||
|
||||
override fun initialize() {
|
||||
isInitialized = true
|
||||
}
|
||||
|
||||
override fun dispose() {
|
||||
isInitialized = false
|
||||
}
|
||||
|
||||
override suspend fun sendMessage(message: String): Result<Unit> {
|
||||
return Result.success(Unit)
|
||||
}
|
||||
|
||||
override suspend fun editMessage(originalEventId: EventId, message: String): Result<Unit> {
|
||||
return Result.success(Unit)
|
||||
}
|
||||
|
||||
override suspend fun replyMessage(inReplyToEventId: EventId, message: String): Result<Unit> {
|
||||
return Result.success(Unit)
|
||||
}
|
||||
|
||||
override suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
|
||||
return Result.success(Unit)
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ class MainActivity : ComponentActivity() {
|
|||
RustMatrixAuthenticationService(
|
||||
context = applicationContext,
|
||||
baseDirectory = baseDirectory,
|
||||
coroutineScope = Singleton.appScope,
|
||||
appCoroutineScope = Singleton.appScope,
|
||||
coroutineDispatchers = Singleton.coroutineDispatchers,
|
||||
sessionStore = InMemorySessionStore(),
|
||||
clock = DefaultSystemClock()
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ class RoomListScreen(
|
|||
stateContentFormatter = StateContentFormatter(stringProvider),
|
||||
),
|
||||
sessionVerificationService = sessionVerificationService,
|
||||
networkMonitor = NetworkMonitorImpl(context),
|
||||
networkMonitor = NetworkMonitorImpl(context, Singleton.appScope),
|
||||
snackbarDispatcher = SnackbarDispatcher(),
|
||||
inviteStateDataSource = DefaultInviteStateDataSource(matrixClient, DefaultSeenInvitesStore(context), coroutineDispatchers),
|
||||
leaveRoomPresenter = LeaveRoomPresenterImpl(matrixClient, RoomMembershipObserver(), coroutineDispatchers)
|
||||
|
|
@ -81,13 +81,11 @@ class RoomListScreen(
|
|||
Singleton.appScope.launch {
|
||||
withContext(coroutineDispatchers.io) {
|
||||
matrixClient.getRoom(roomId)!!.use { room ->
|
||||
room.open()
|
||||
val timeline = room.timeline()
|
||||
|
||||
timeline.apply {
|
||||
// TODO This doesn't work reliably as initialize is asynchronous, and the timeline can't be used until it's finished
|
||||
initialize()
|
||||
paginateBackwards(20, 50)
|
||||
dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -109,10 +107,10 @@ class RoomListScreen(
|
|||
|
||||
DisposableEffect(Unit) {
|
||||
Timber.w("Start sync!")
|
||||
matrixClient.startSync()
|
||||
matrixClient.syncService().startSync()
|
||||
onDispose {
|
||||
Timber.w("Stop sync!")
|
||||
matrixClient.stopSync()
|
||||
matrixClient.syncService().stopSync()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue