Merge branch 'develop' into feature/fga/avoid_deadlocks

This commit is contained in:
ganfra 2023-07-25 16:09:24 +02:00
commit da57f42fcc
307 changed files with 2806 additions and 1172 deletions

View file

@ -16,8 +16,8 @@
package io.element.android.libraries.matrix.impl.auth
import io.element.android.libraries.matrix.api.auth.OidcConfig
// TODO Oidc
// import io.element.android.libraries.matrix.api.auth.OidcConfig
// import org.matrix.rustcomponents.sdk.OidcClientMetadata
/*

View file

@ -17,7 +17,6 @@
package io.element.android.libraries.matrix.impl.media
import io.element.android.libraries.matrix.api.media.ImageInfo
import org.matrix.rustcomponents.sdk.MediaSource
import org.matrix.rustcomponents.sdk.ImageInfo as RustImageInfo
fun RustImageInfo.map(): ImageInfo = ImageInfo(

View file

@ -40,9 +40,6 @@ import io.element.android.libraries.matrix.impl.core.toProgressWatcher
import io.element.android.libraries.matrix.impl.media.map
import io.element.android.libraries.matrix.impl.room.location.toInner
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
import io.element.android.libraries.matrix.impl.timeline.backPaginationStatusFlow
import io.element.android.libraries.matrix.impl.timeline.eventOrigin
import io.element.android.libraries.matrix.impl.timeline.timelineDiffFlow
import io.element.android.libraries.sessionstorage.api.SessionData
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.CoroutineScope
@ -51,11 +48,7 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.EventItemOrigin
import org.matrix.rustcomponents.sdk.RequiredState
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.RoomListItem
@ -88,7 +81,6 @@ class RustMatrixRoom(
private val roomCoroutineScope = sessionCoroutineScope.childScope(coroutineDispatchers.main, "RoomScope-$roomId")
private val _membersStateFlow = MutableStateFlow<MatrixRoomMembersState>(MatrixRoomMembersState.Unknown)
private val isInit = MutableStateFlow(false)
private val _syncUpdateFlow = MutableStateFlow(0L)
private val _timeline by lazy {
RustMatrixTimeline(
@ -97,6 +89,7 @@ class RustMatrixRoom(
roomCoroutineScope = roomCoroutineScope,
dispatcher = roomDispatcher,
lastLoginTimestamp = sessionData.loginTimestamp,
onNewSyncedEvent = { _syncUpdateFlow.value = systemClock.epochMillis() }
)
}
@ -106,8 +99,7 @@ class RustMatrixRoom(
override val timeline: MatrixTimeline = _timeline
override fun open(): Result<Unit> {
if (isInit.value) return Result.failure(IllegalStateException("Listener already registered"))
override fun subscribeToSync() {
val settings = RoomSubscription(
requiredState = listOf(
RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""),
@ -118,37 +110,16 @@ class RustMatrixRoom(
timelineLimit = null
)
roomListItem.subscribe(settings)
roomCoroutineScope.launch(roomDispatcher) {
innerRoom.timelineDiffFlow { initialList ->
_timeline.postItems(initialList)
}.onEach { diff ->
diff.use {
if (diff.eventOrigin() == EventItemOrigin.SYNC) {
_syncUpdateFlow.value = systemClock.epochMillis()
}
_timeline.postDiff(diff)
}
}.launchIn(this)
innerRoom.backPaginationStatusFlow()
.onEach {
_timeline.postPaginationStatus(it)
}.launchIn(this)
fetchMembers()
}
isInit.value = true
return Result.success(Unit)
}
override fun close() {
if (isInit.value) {
isInit.value = false
roomCoroutineScope.cancel()
roomListItem.unsubscribe()
innerRoom.destroy()
roomListItem.destroy()
}
override fun unsubscribeFromSync() {
roomListItem.unsubscribe()
}
override fun destroy() {
roomCoroutineScope.cancel()
innerRoom.destroy()
roomListItem.destroy()
}
override val name: String?
@ -365,12 +336,6 @@ class RustMatrixRoom(
}
}
private suspend fun fetchMembers() = withContext(roomDispatcher) {
runCatching {
innerRoom.fetchMembers()
}
}
override suspend fun reportContent(eventId: EventId, reason: String, blockUserId: UserId?): Result<Unit> = withContext(roomDispatcher) {
runCatching {
innerRoom.reportContent(eventId = eventId.value, score = null, reason = reason)

View file

@ -39,10 +39,14 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.getAndUpdate
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.BackPaginationStatus
import org.matrix.rustcomponents.sdk.EventItemOrigin
import org.matrix.rustcomponents.sdk.PaginationOptions
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.TimelineDiff
@ -59,6 +63,7 @@ class RustMatrixTimeline(
private val innerRoom: Room,
private val dispatcher: CoroutineDispatcher,
private val lastLoginTimestamp: Date?,
private val onNewSyncedEvent: () -> Unit,
) : MatrixTimeline {
private val initLatch = CompletableDeferred<Unit>()
@ -95,13 +100,40 @@ class RustMatrixTimeline(
override val paginationState: StateFlow<MatrixTimeline.PaginationState> = _paginationState.asStateFlow()
init {
Timber.d("Initialize timeline for room ${matrixRoom.roomId}")
roomCoroutineScope.launch(dispatcher) {
innerRoom.timelineDiffFlow { initialList ->
postItems(initialList)
}.onEach { diff ->
if (diff.eventOrigin() == EventItemOrigin.SYNC) {
onNewSyncedEvent()
}
postDiff(diff)
}.launchIn(this)
innerRoom.backPaginationStatusFlow()
.onEach {
postPaginationStatus(it)
}.launchIn(this)
fetchMembers()
}
}
private suspend fun fetchMembers() = withContext(dispatcher) {
runCatching {
innerRoom.fetchMembers()
}
}
@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class)
override val timelineItems: Flow<List<MatrixTimelineItem>> = _timelineItems.sample(50)
.mapLatest { items ->
encryptedHistoryPostProcessor.process(items)
}
internal suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
ensureActive()
@ -111,12 +143,12 @@ class RustMatrixTimeline(
initLatch.complete(Unit)
}
internal suspend fun postDiff(timelineDiff: TimelineDiff) {
private suspend fun postDiff(timelineDiff: TimelineDiff) {
initLatch.await()
timelineDiffProcessor.postDiff(timelineDiff)
}
internal fun postPaginationStatus(status: BackPaginationStatus) {
private fun postPaginationStatus(status: BackPaginationStatus) {
_paginationState.getAndUpdate { currentPaginationState ->
if (hasEncryptionHistoryBanner()) {
return@getAndUpdate currentPaginationState.copy(

View file

@ -45,7 +45,7 @@ class EventMessageMapper {
fun map(message: Message): MessageContent = message.use {
val type = it.msgtype().use(this::mapMessageType)
val inReplyToId = it.inReplyTo()?.eventId?.let(::EventId)
val inReplyToEvent: InReplyTo? = (it.inReplyTo()?.event)?.use { details ->
val inReplyToEvent: InReplyTo? = it.inReplyTo()?.event?.use { details ->
when (details) {
is RepliedToEventDetails.Ready -> {
val senderProfile = details.senderProfile as? ProfileDetails.Ready

View file

@ -22,7 +22,6 @@ import io.element.android.libraries.matrix.api.timeline.item.virtual.VirtualTime
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.getAndUpdate
import java.util.Date
import java.util.UUID
class TimelineEncryptedHistoryPostProcessor(
private val lastLoginTimestamp: Date?,
@ -70,5 +69,4 @@ class TimelineEncryptedHistoryPostProcessor(
val timestamp = (item as? MatrixTimelineItem.Event)?.event?.timestamp ?: return false
return timestamp <= lastLoginTimestamp!!.time
}
}