Notification events resolving and rendering in batches (#4722)

- Use `NotiticationService.getNotifications()` function so we resolve the events in bulk.
- Added `NotifierResolverQueue` to group the notifications to resolve based on a debounce strategy.
- Batch rendering of these events as notifications.
This commit is contained in:
Jorge Martin Espinosa 2025-05-26 17:10:20 +02:00 committed by GitHub
parent f0c9f8294a
commit f455085e08
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 882 additions and 523 deletions

View file

@ -11,10 +11,10 @@ import android.content.Context
import android.net.Uri
import androidx.core.content.FileProvider
import com.squareup.anvil.annotations.ContributesBinding
import io.element.android.libraries.core.extensions.flatMap
import io.element.android.libraries.core.log.logger.LoggerTag
import io.element.android.libraries.di.AppScope
import io.element.android.libraries.di.ApplicationContext
import io.element.android.libraries.di.SingleIn
import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.api.MatrixClientProvider
import io.element.android.libraries.matrix.api.core.EventId
@ -61,10 +61,14 @@ private val loggerTag = LoggerTag("DefaultNotifiableEventResolver", LoggerTag.No
* this pattern allow decoupling between the object responsible of displaying notifications and the matrix sdk.
*/
interface NotifiableEventResolver {
suspend fun resolveEvent(sessionId: SessionId, roomId: RoomId, eventId: EventId): Result<ResolvedPushEvent>
suspend fun resolveEvents(
sessionId: SessionId,
notificationEventRequests: List<NotificationEventRequest>
): Result<Map<NotificationEventRequest, Result<ResolvedPushEvent>>>
}
@ContributesBinding(AppScope::class)
@SingleIn(AppScope::class)
class DefaultNotifiableEventResolver @Inject constructor(
private val stringProvider: StringProvider,
private val clock: SystemClock,
@ -75,29 +79,34 @@ class DefaultNotifiableEventResolver @Inject constructor(
private val callNotificationEventResolver: CallNotificationEventResolver,
private val appPreferencesStore: AppPreferencesStore,
) : NotifiableEventResolver {
override suspend fun resolveEvent(sessionId: SessionId, roomId: RoomId, eventId: EventId): Result<ResolvedPushEvent> {
// Restore session
val client = matrixClientProvider.getOrRestore(sessionId).getOrNull() ?: return Result.failure(
ResolvingException("Unable to restore session for $sessionId")
)
val notificationService = client.notificationService()
val notificationData = notificationService.getNotification(
roomId = roomId,
eventId = eventId,
).onFailure {
Timber.tag(loggerTag.value).e(it, "Unable to resolve event: $eventId.")
override suspend fun resolveEvents(
sessionId: SessionId,
notificationEventRequests: List<NotificationEventRequest>
): Result<Map<NotificationEventRequest, Result<ResolvedPushEvent>>> {
Timber.d("Queueing notifications: $notificationEventRequests")
val client = matrixClientProvider.getOrRestore(sessionId).getOrElse {
return Result.failure(IllegalStateException("Couldn't get or restore client for session $sessionId"))
}
val ids = notificationEventRequests.groupBy { it.roomId }.mapValues { (_, value) -> value.map { it.eventId } }
// TODO this notificationData is not always valid at the moment, sometimes the Rust SDK can't fetch the matching event
return notificationData.flatMap {
if (it == null) {
Timber.tag(loggerTag.value).d("No notification data found for event $eventId")
return@flatMap Result.failure(ResolvingException("Unable to resolve event $eventId"))
} else {
Timber.tag(loggerTag.value).d("Found notification item for $eventId")
it.asNotifiableEvent(client, sessionId)
val notifications = client.notificationService().getNotifications(ids).mapCatching { map ->
map.mapValues { (_, notificationData) ->
notificationData.asNotifiableEvent(client, sessionId)
}
}
return Result.success(
notificationEventRequests.associate {
val notificationData = notifications.getOrNull()?.get(it.eventId)
if (notificationData != null) {
it to notificationData
} else {
// TODO once the SDK can actually return what went wrong, we should return it here instead of this generic error
it to Result.failure(ResolvingException("No notification data for ${it.roomId} - ${it.eventId}"))
}
}
)
}
private suspend fun NotificationData.asNotifiableEvent(

View file

@ -113,6 +113,11 @@ class DefaultNotificationDrawerManager @Inject constructor(
renderEvents(listOf(notifiableEvent))
}
suspend fun onNotifiableEventsReceived(notifiableEvents: List<NotifiableEvent>) {
val eventsToNotify = notifiableEvents.filter { !it.shouldIgnoreEventInRoom(appNavigationStateService.appNavigationState.value) }
renderEvents(eventsToNotify)
}
/**
* Clear all known message events for a [sessionId].
*/

View file

@ -30,8 +30,9 @@ class DefaultOnMissedCallNotificationHandler @Inject constructor(
// Resolve the event and add a notification for it, at this point it should no longer be a ringing one
val notificationData = matrixClientProvider.getOrRestore(sessionId).getOrNull()
?.notificationService()
?.getNotification(roomId, eventId)
?.getNotifications(mapOf(roomId to listOf(eventId)))
?.getOrNull()
?.get(eventId)
?: return
val notifiableEvent = callNotificationEventResolver.resolveEvent(

View file

@ -163,7 +163,7 @@ class NotificationBroadcastReceiverHandler @Inject constructor(
roomIsDm = room.isDm(),
outGoingMessage = true,
)
onNotifiableEventReceived.onNotifiableEventReceived(notifiableMessageEvent)
onNotifiableEventReceived.onNotifiableEventsReceived(listOf(notifiableMessageEvent))
if (threadId != null && replyToEventId != null) {
room.liveTimeline.replyMessage(
@ -181,9 +181,11 @@ class NotificationBroadcastReceiverHandler @Inject constructor(
)
}.onFailure {
Timber.e(it, "Failed to send smart reply message")
onNotifiableEventReceived.onNotifiableEventReceived(
notifiableMessageEvent.copy(
outGoingMessageFailed = true
onNotifiableEventReceived.onNotifiableEventsReceived(
listOf(
notifiableMessageEvent.copy(
outGoingMessageFailed = true
)
)
)
}

View file

@ -37,6 +37,7 @@ class DefaultNotificationDisplayer @Inject constructor(
return false
}
notificationManager.notify(tag, id, notification)
Timber.d("Notifying with tag: $tag, id: $id")
return true
}

View file

@ -0,0 +1,101 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.notifications
import io.element.android.libraries.di.AppScope
import io.element.android.libraries.di.SingleIn
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
import kotlin.time.Duration.Companion.milliseconds
/**
* This class is responsible for periodically batching notification requests and resolving them in a single call,
* so that we can avoid having to resolve each notification individually in the SDK.
*/
@OptIn(ExperimentalCoroutinesApi::class)
@SingleIn(AppScope::class)
class NotificationResolverQueue @Inject constructor(
private val notifiableEventResolver: NotifiableEventResolver,
private val appCoroutineScope: CoroutineScope,
) {
companion object {
private const val BATCH_WINDOW_MS = 250L
}
private val requestQueue = Channel<NotificationEventRequest>(capacity = 100)
private var currentProcessingJob: Job? = null
/**
* A flow that emits pairs of a list of notification event requests and a map of the resolved events.
* The map contains the original request as the key and the resolved event as the value.
*/
val results: SharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>> = MutableSharedFlow()
/**
* Enqueues a notification event request to be resolved.
* The request will be processed in batches, so it may not be resolved immediately.
*
* @param request The notification event request to enqueue.
*/
suspend fun enqueue(request: NotificationEventRequest) {
// Cancel previous processing job if it exists, acting as a debounce operation
Timber.d("Cancelling job: $currentProcessingJob")
currentProcessingJob?.cancel()
// Enqueue the request and start a delayed processing job
requestQueue.send(request)
currentProcessingJob = processQueue()
Timber.d("Starting processing job for request: $request")
}
private fun processQueue() = appCoroutineScope.launch(SupervisorJob()) {
delay(BATCH_WINDOW_MS.milliseconds)
// If this job is still active (so this is the latest job), we launch a separate one that won't be cancelled when enqueueing new items
// to process the existing queued items.
appCoroutineScope.launch {
val groupedRequestsById = buildList {
while (!requestQueue.isEmpty) {
requestQueue.receiveCatching().getOrNull()?.let(this::add)
}
}.groupBy { it.sessionId }
val sessionIds = groupedRequestsById.keys
for (sessionId in sessionIds) {
val requests = groupedRequestsById[sessionId].orEmpty()
Timber.d("Fetching notifications for $sessionId: $requests. Pending requests: ${!requestQueue.isEmpty}")
// Resolving the events in parallel should improve performance since each session id will query a different Client
launch {
// No need for a Mutex since the SDK already has one internally
val notifications = notifiableEventResolver.resolveEvents(sessionId, requests).getOrNull().orEmpty()
(results as MutableSharedFlow).emit(requests to notifications)
}
}
}
}
}
data class NotificationEventRequest(
val sessionId: SessionId,
val roomId: RoomId,
val eventId: EventId,
val providerInfo: String,
)

View file

@ -12,12 +12,22 @@ import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
sealed interface ResolvedPushEvent {
data class Event(val notifiableEvent: NotifiableEvent) : ResolvedPushEvent
val sessionId: SessionId
val roomId: RoomId
val eventId: EventId
data class Event(val notifiableEvent: NotifiableEvent) : ResolvedPushEvent {
override val sessionId: SessionId = notifiableEvent.sessionId
override val roomId: RoomId = notifiableEvent.roomId
override val eventId: EventId = notifiableEvent.eventId
}
data class Redaction(
val sessionId: SessionId,
val roomId: RoomId,
override val sessionId: SessionId,
override val roomId: RoomId,
val redactedEventId: EventId,
val reason: String?,
) : ResolvedPushEvent
) : ResolvedPushEvent {
override val eventId: EventId = redactedEventId
}
}

View file

@ -13,6 +13,7 @@ import io.element.android.features.call.api.ElementCallEntryPoint
import io.element.android.libraries.core.log.logger.LoggerTag
import io.element.android.libraries.core.meta.BuildMeta
import io.element.android.libraries.di.AppScope
import io.element.android.libraries.di.SingleIn
import io.element.android.libraries.matrix.api.auth.MatrixAuthenticationService
import io.element.android.libraries.push.impl.history.PushHistoryService
import io.element.android.libraries.push.impl.history.onDiagnosticPush
@ -20,8 +21,10 @@ import io.element.android.libraries.push.impl.history.onInvalidPushReceived
import io.element.android.libraries.push.impl.history.onSuccess
import io.element.android.libraries.push.impl.history.onUnableToResolveEvent
import io.element.android.libraries.push.impl.history.onUnableToRetrieveSession
import io.element.android.libraries.push.impl.notifications.NotifiableEventResolver
import io.element.android.libraries.push.impl.notifications.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.push.impl.notifications.channels.NotificationChannels
import io.element.android.libraries.push.impl.notifications.model.NotifiableEvent
import io.element.android.libraries.push.impl.notifications.model.NotifiableRingingCallEvent
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import io.element.android.libraries.push.impl.test.DefaultTestPush
@ -30,17 +33,21 @@ import io.element.android.libraries.pushproviders.api.PushData
import io.element.android.libraries.pushproviders.api.PushHandler
import io.element.android.libraries.pushstore.api.UserPushStoreFactory
import io.element.android.libraries.pushstore.api.clientsecret.PushClientSecret
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
private val loggerTag = LoggerTag("PushHandler", LoggerTag.PushLoggerTag)
@SingleIn(AppScope::class)
@ContributesBinding(AppScope::class)
class DefaultPushHandler @Inject constructor(
private val onNotifiableEventReceived: OnNotifiableEventReceived,
private val onRedactedEventReceived: OnRedactedEventReceived,
private val notifiableEventResolver: NotifiableEventResolver,
private val incrementPushDataStore: IncrementPushDataStore,
private val userPushStoreFactory: UserPushStoreFactory,
private val pushClientSecret: PushClientSecret,
@ -50,7 +57,85 @@ class DefaultPushHandler @Inject constructor(
private val elementCallEntryPoint: ElementCallEntryPoint,
private val notificationChannels: NotificationChannels,
private val pushHistoryService: PushHistoryService,
private val resolverQueue: NotificationResolverQueue,
private val appCoroutineScope: CoroutineScope,
) : PushHandler {
init {
processPushEventResults()
}
/**
* Process the push notification event results emitted by the [resolverQueue].
*/
private fun processPushEventResults() {
resolverQueue.results
.map { (requests, resolvedEvents) ->
for (request in requests) {
// Log the result of the push notification event
val result = resolvedEvents[request]
if (result?.isSuccess == true) {
pushHistoryService.onSuccess(
providerInfo = request.providerInfo,
eventId = request.eventId,
roomId = request.roomId,
sessionId = request.sessionId,
comment = "Push handled successfully",
)
} else {
pushHistoryService.onUnableToResolveEvent(
providerInfo = request.providerInfo,
eventId = request.eventId,
roomId = request.roomId,
sessionId = request.sessionId,
reason = "Push not handled",
)
}
}
val events = mutableListOf<NotifiableEvent>()
val redactions = mutableListOf<ResolvedPushEvent.Redaction>()
@Suppress("LoopWithTooManyJumpStatements")
for (result in resolvedEvents.values) {
val event = result.getOrNull() ?: continue
val userPushStore = userPushStoreFactory.getOrCreate(event.sessionId)
val areNotificationsEnabled = userPushStore.getNotificationEnabledForDevice().first()
// If notifications are disabled for this session and device, we don't want to show the notification
// But if it's a ringing call, we want to show it anyway
val isRingingCall = (event as? ResolvedPushEvent.Event)?.notifiableEvent is NotifiableRingingCallEvent
if (!areNotificationsEnabled && !isRingingCall) continue
// We categorise each result into either a NotifiableEvent or a Redaction
when (event) {
is ResolvedPushEvent.Event -> {
events.add(event.notifiableEvent)
}
is ResolvedPushEvent.Redaction -> {
redactions.add(event)
}
}
}
// Process redactions of messages
if (redactions.isNotEmpty()) {
onRedactedEventReceived.onRedactedEventsReceived(redactions)
}
// Find and process ringing call notifications separately
val (ringingCallEvents, nonRingingCallEvents) = events.partition { it is NotifiableRingingCallEvent }
for (ringingCallEvent in ringingCallEvents) {
Timber.tag(loggerTag.value).d("Ringing call event: $ringingCallEvent")
handleRingingCallEvent(ringingCallEvent as NotifiableRingingCallEvent)
}
// Finally, process other notifications (messages, invites, generic notifications, etc.)
if (nonRingingCallEvents.isNotEmpty()) {
onNotifiableEventReceived.onNotifiableEventsReceived(nonRingingCallEvents)
}
}
.launchIn(appCoroutineScope)
}
/**
* Called when message is received.
*
@ -119,52 +204,17 @@ class DefaultPushHandler @Inject constructor(
)
return
}
notifiableEventResolver.resolveEvent(userId, pushData.roomId, pushData.eventId).fold(
onSuccess = { resolvedPushEvent ->
pushHistoryService.onSuccess(
providerInfo = providerInfo,
eventId = pushData.eventId,
roomId = pushData.roomId,
sessionId = userId,
comment = resolvedPushEvent.javaClass.simpleName,
)
when (resolvedPushEvent) {
is ResolvedPushEvent.Event -> {
when (val notifiableEvent = resolvedPushEvent.notifiableEvent) {
is NotifiableRingingCallEvent -> {
Timber.tag(loggerTag.value).d("Notifiable event ${pushData.eventId} is ringing call: $notifiableEvent")
onNotifiableEventReceived.onNotifiableEventReceived(notifiableEvent)
handleRingingCallEvent(notifiableEvent)
}
else -> {
Timber.tag(loggerTag.value).d("Notifiable event ${pushData.eventId} is normal event: $notifiableEvent")
val userPushStore = userPushStoreFactory.getOrCreate(userId)
val areNotificationsEnabled = userPushStore.getNotificationEnabledForDevice().first()
if (areNotificationsEnabled) {
onNotifiableEventReceived.onNotifiableEventReceived(notifiableEvent)
} else {
Timber.tag(loggerTag.value).i("Notification are disabled for this device, ignore push.")
}
}
}
}
is ResolvedPushEvent.Redaction -> {
onRedactedEventReceived.onRedactedEventReceived(resolvedPushEvent)
}
}
},
onFailure = { failure ->
Timber.tag(loggerTag.value).w(failure, "Unable to get a notification data")
pushHistoryService.onUnableToResolveEvent(
providerInfo = providerInfo,
eventId = pushData.eventId,
roomId = pushData.roomId,
sessionId = userId,
reason = failure.message ?: failure.javaClass.simpleName,
)
}
)
appCoroutineScope.launch {
val notificationEventRequest = NotificationEventRequest(
sessionId = userId,
roomId = pushData.roomId,
eventId = pushData.eventId,
providerInfo = providerInfo,
)
Timber.d("Queueing notification: $notificationEventRequest")
resolverQueue.enqueue(notificationEventRequest)
}
} catch (e: Exception) {
Timber.tag(loggerTag.value).e(e, "## handleInternal() failed")
}

View file

@ -17,7 +17,7 @@ import kotlinx.coroutines.launch
import javax.inject.Inject
interface OnNotifiableEventReceived {
fun onNotifiableEventReceived(notifiableEvent: NotifiableEvent)
fun onNotifiableEventsReceived(notifiableEvents: List<NotifiableEvent>)
}
@ContributesBinding(AppScope::class)
@ -26,12 +26,10 @@ class DefaultOnNotifiableEventReceived @Inject constructor(
private val coroutineScope: CoroutineScope,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
) : OnNotifiableEventReceived {
override fun onNotifiableEventReceived(notifiableEvent: NotifiableEvent) {
override fun onNotifiableEventsReceived(notifiableEvents: List<NotifiableEvent>) {
coroutineScope.launch {
launch { syncOnNotifiableEvent(notifiableEvent) }
if (notifiableEvent !is NotifiableRingingCallEvent) {
defaultNotificationDrawerManager.onNotifiableEventReceived(notifiableEvent)
}
launch { syncOnNotifiableEvent(notifiableEvents) }
defaultNotificationDrawerManager.onNotifiableEventsReceived(notifiableEvents.filter { it !is NotifiableRingingCallEvent })
}
}
}

View file

@ -29,7 +29,7 @@ import timber.log.Timber
import javax.inject.Inject
interface OnRedactedEventReceived {
fun onRedactedEventReceived(redaction: ResolvedPushEvent.Redaction)
fun onRedactedEventsReceived(redactions: List<ResolvedPushEvent.Redaction>)
}
@ContributesBinding(AppScope::class)
@ -40,48 +40,54 @@ class DefaultOnRedactedEventReceived @Inject constructor(
@ApplicationContext private val context: Context,
private val stringProvider: StringProvider,
) : OnRedactedEventReceived {
override fun onRedactedEventReceived(redaction: ResolvedPushEvent.Redaction) {
override fun onRedactedEventsReceived(redactions: List<ResolvedPushEvent.Redaction>) {
coroutineScope.launch {
val notifications = activeNotificationsProvider.getMessageNotificationsForRoom(
redaction.sessionId,
redaction.roomId,
)
if (notifications.isEmpty()) {
Timber.d("No notifications found for redacted event")
val redactionsBySessionIdAndRoom = redactions.groupBy { redaction ->
redaction.sessionId to redaction.roomId
}
notifications.forEach { statusBarNotification ->
val notification = statusBarNotification.notification
val messagingStyle = MessagingStyle.extractMessagingStyleFromNotification(notification)
if (messagingStyle == null) {
Timber.w("Unable to retrieve messaging style from notification")
return@forEach
for ((keys, roomRedactions) in redactionsBySessionIdAndRoom) {
val (sessionId, roomId) = keys
val notifications = activeNotificationsProvider.getMessageNotificationsForRoom(
sessionId,
roomId,
)
if (notifications.isEmpty()) {
Timber.d("No notifications found for redacted event")
}
val messageToRedactIndex = messagingStyle.messages.indexOfFirst { message ->
message.extras.getString(DefaultNotificationCreator.MESSAGE_EVENT_ID) == redaction.redactedEventId.value
}
if (messageToRedactIndex == -1) {
Timber.d("Unable to find the message to remove from notification")
return@forEach
}
val oldMessage = messagingStyle.messages[messageToRedactIndex]
val content = buildSpannedString {
inSpans(StyleSpan(Typeface.ITALIC)) {
append(stringProvider.getString(CommonStrings.common_message_removed))
notifications.forEach { statusBarNotification ->
val notification = statusBarNotification.notification
val messagingStyle = MessagingStyle.extractMessagingStyleFromNotification(notification)
if (messagingStyle == null) {
Timber.w("Unable to retrieve messaging style from notification")
return@forEach
}
val messageToRedactIndex = messagingStyle.messages.indexOfFirst { message ->
roomRedactions.any { it.redactedEventId.value == message.extras.getString(DefaultNotificationCreator.MESSAGE_EVENT_ID) }
}
if (messageToRedactIndex == -1) {
Timber.d("Unable to find the message to remove from notification")
return@forEach
}
val oldMessage = messagingStyle.messages[messageToRedactIndex]
val content = buildSpannedString {
inSpans(StyleSpan(Typeface.ITALIC)) {
append(stringProvider.getString(CommonStrings.common_message_removed))
}
}
val newMessage = MessagingStyle.Message(
content,
oldMessage.timestamp,
oldMessage.person
)
messagingStyle.messages[messageToRedactIndex] = newMessage
notificationDisplayer.showNotificationMessage(
statusBarNotification.tag,
statusBarNotification.id,
NotificationCompat.Builder(context, notification)
.setStyle(messagingStyle)
.build()
)
}
val newMessage = MessagingStyle.Message(
content,
oldMessage.timestamp,
oldMessage.person
)
messagingStyle.messages[messageToRedactIndex] = newMessage
notificationDisplayer.showNotificationMessage(
statusBarNotification.tag,
statusBarNotification.id,
NotificationCompat.Builder(context, notification)
.setStyle(messagingStyle)
.build()
)
}
}
}

View file

@ -8,15 +8,14 @@
package io.element.android.libraries.push.impl.push
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.core.coroutine.parallelMap
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.matrix.api.MatrixClientProvider
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.room.BaseRoom
import io.element.android.libraries.matrix.api.room.JoinedRoom
import io.element.android.libraries.matrix.api.timeline.MatrixTimelineItem
import io.element.android.libraries.push.impl.notifications.model.NotifiableEvent
import io.element.android.libraries.push.impl.notifications.model.NotifiableRingingCallEvent
import io.element.android.services.appnavstate.api.ActiveRoomsHolder
import io.element.android.services.appnavstate.api.AppForegroundStateService
import kotlinx.coroutines.flow.first
@ -33,55 +32,42 @@ class SyncOnNotifiableEvent @Inject constructor(
private val dispatchers: CoroutineDispatchers,
private val activeRoomsHolder: ActiveRoomsHolder,
) {
suspend operator fun invoke(notifiableEvent: NotifiableEvent) = withContext(dispatchers.io) {
val isRingingCallEvent = notifiableEvent is NotifiableRingingCallEvent
if (!featureFlagService.isFeatureEnabled(FeatureFlags.SyncOnPush) && !isRingingCallEvent) {
suspend operator fun invoke(notifiableEvents: List<NotifiableEvent>) = withContext(dispatchers.io) {
if (!featureFlagService.isFeatureEnabled(FeatureFlags.SyncOnPush)) {
return@withContext
}
val activeRoom = activeRoomsHolder.getActiveRoomMatching(notifiableEvent.sessionId, notifiableEvent.roomId)
try {
val eventsBySession = notifiableEvents.groupBy { it.sessionId }
if (activeRoom != null) {
// If the room is already active, we can use it directly
activeRoom.subscribeToSyncAndWait(notifiableEvent, isRingingCallEvent)
} else {
// Otherwise, we need to get the room from the matrix client
val room = matrixClientProvider
.getOrRestore(notifiableEvent.sessionId)
.mapCatching { it.getJoinedRoom(notifiableEvent.roomId) }
.getOrNull()
appForegroundStateService.updateIsSyncingNotificationEvent(true)
room?.use { it.subscribeToSyncAndWait(notifiableEvent, isRingingCallEvent) }
}
}
for ((sessionId, events) in eventsBySession) {
val client = matrixClientProvider.getOrRestore(sessionId).getOrNull() ?: continue
val eventsByRoomId = events.groupBy { it.roomId }
private suspend fun JoinedRoom.subscribeToSyncAndWait(notifiableEvent: NotifiableEvent, isRingingCallEvent: Boolean) {
subscribeToSync()
client.roomListService.subscribeToVisibleRooms(eventsByRoomId.keys.toList())
// If the app is in foreground, sync is already running, so we just add the subscription above.
if (!appForegroundStateService.isInForeground.value) {
if (isRingingCallEvent) {
waitsUntilUserIsInTheCall(timeout = 60.seconds)
} else {
try {
appForegroundStateService.updateIsSyncingNotificationEvent(true)
waitsUntilEventIsKnown(eventId = notifiableEvent.eventId, timeout = 10.seconds)
} finally {
appForegroundStateService.updateIsSyncingNotificationEvent(false)
if (!appForegroundStateService.isInForeground.value) {
for ((roomId, eventsInRoom) in eventsByRoomId) {
val activeRoom = activeRoomsHolder.getActiveRoomMatching(sessionId, roomId)
val room = activeRoom ?: client.getJoinedRoom(roomId)
if (room != null) {
eventsInRoom.parallelMap { event ->
room.waitsUntilEventIsKnown(event.eventId, timeout = 10.seconds)
}
}
if (room != null && activeRoom == null) {
// Destroy the room we just instantiated to reset its live timeline
room.destroy()
}
}
}
}
}
}
/**
* User can be in the call if they answer using another session.
* If the user does not join the call, the timeout will be reached.
*/
private suspend fun BaseRoom.waitsUntilUserIsInTheCall(timeout: Duration) {
withTimeoutOrNull(timeout) {
roomInfoFlow.first {
sessionId in it.activeRoomCallParticipants
}
} finally {
appForegroundStateService.updateIsSyncingNotificationEvent(false)
}
}