Sync notifications using WorkManager (#5545)

* Initial implementation of notification sync using `WorkManager`

* Use custom `MetroWorkerFactory` to allow assisted injection in WorkManager Workers

* Add tests for `FetchNotificationWorker`. Create `FakeNotificationResolverQueue` to help testing.

* Add more tests, fix Konsist checks

* Add tests for `SyncNotificationWorkManagerRequest`

* Simplify `FakeNotificationResolverQueue`
This commit is contained in:
Jorge Martin Espinosa 2025-10-17 11:51:27 +02:00 committed by GitHub
parent f3d75ee85c
commit ebe94f873e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
38 changed files with 968 additions and 98 deletions

View file

@ -44,6 +44,7 @@ import io.element.android.libraries.matrix.api.timeline.item.event.TextMessageTy
import io.element.android.libraries.matrix.api.timeline.item.event.VideoMessageType
import io.element.android.libraries.matrix.api.timeline.item.event.VoiceMessageType
import io.element.android.libraries.matrix.ui.messages.toPlainText
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.impl.R
import io.element.android.libraries.push.impl.notifications.model.InviteNotifiableEvent
import io.element.android.libraries.push.impl.notifications.model.NotifiableMessageEvent
@ -95,7 +96,10 @@ class DefaultNotifiableEventResolver(
val client = matrixClientProvider.getOrRestore(sessionId).getOrElse {
return Result.failure(IllegalStateException("Couldn't get or restore client for session $sessionId"))
}
val ids = notificationEventRequests.groupBy { it.roomId }.mapValues { (_, value) -> value.map { it.eventId } }
val ids = notificationEventRequests.groupBy { it.roomId }
.mapValues { (_, requests) ->
requests.map { it.eventId }
}
// TODO this notificationData is not always valid at the moment, sometimes the Rust SDK can't fetch the matching event
val notificationsResult = client.notificationService.getNotifications(ids)

View file

@ -8,13 +8,16 @@
package io.element.android.libraries.push.impl.notifications
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import dev.zacsweers.metro.Inject
import dev.zacsweers.metro.SingleIn
import io.element.android.libraries.di.annotations.AppCoroutineScope
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import io.element.android.libraries.push.impl.workmanager.SyncNotificationWorkManagerRequest
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
@ -27,18 +30,26 @@ import kotlinx.coroutines.launch
import timber.log.Timber
import kotlin.time.Duration.Companion.milliseconds
interface NotificationResolverQueue {
val results: SharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>>
suspend fun enqueue(request: NotificationEventRequest)
}
/**
* This class is responsible for periodically batching notification requests and resolving them in a single call,
* so that we can avoid having to resolve each notification individually in the SDK.
*/
@OptIn(ExperimentalCoroutinesApi::class)
@SingleIn(AppScope::class)
@ContributesBinding(AppScope::class)
@Inject
class NotificationResolverQueue(
class DefaultNotificationResolverQueue(
private val notifiableEventResolver: NotifiableEventResolver,
@AppCoroutineScope
private val appCoroutineScope: CoroutineScope,
) {
private val workManagerScheduler: WorkManagerScheduler,
private val featureFlagService: FeatureFlagService,
) : NotificationResolverQueue {
companion object {
private const val BATCH_WINDOW_MS = 250L
}
@ -50,7 +61,7 @@ class NotificationResolverQueue(
* A flow that emits pairs of a list of notification event requests and a map of the resolved events.
* The map contains the original request as the key and the resolved event as the value.
*/
val results: SharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>> = MutableSharedFlow()
override val results = MutableSharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>>()
/**
* Enqueues a notification event request to be resolved.
@ -58,7 +69,7 @@ class NotificationResolverQueue(
*
* @param request The notification event request to enqueue.
*/
suspend fun enqueue(request: NotificationEventRequest) {
override suspend fun enqueue(request: NotificationEventRequest) {
// Cancel previous processing job if it exists, acting as a debounce operation
Timber.d("Cancelling job: $currentProcessingJob")
currentProcessingJob?.cancel()
@ -77,28 +88,27 @@ class NotificationResolverQueue(
appCoroutineScope.launch {
val groupedRequestsById = buildList {
while (!requestQueue.isEmpty) {
requestQueue.receiveCatching().getOrNull()?.let(this::add)
requestQueue.receiveCatching().getOrNull()?.let(::add)
}
}.groupBy { it.sessionId }
val sessionIds = groupedRequestsById.keys
for (sessionId in sessionIds) {
val requests = groupedRequestsById[sessionId].orEmpty()
Timber.d("Fetching notifications for $sessionId: $requests. Pending requests: ${!requestQueue.isEmpty}")
// Resolving the events in parallel should improve performance since each session id will query a different Client
launch {
// No need for a Mutex since the SDK already has one internally
val notifications = notifiableEventResolver.resolveEvents(sessionId, requests).getOrNull().orEmpty()
(results as MutableSharedFlow).emit(requests to notifications)
if (featureFlagService.isFeatureEnabled(FeatureFlags.SyncNotificationsWithWorkManager)) {
for ((sessionId, requests) in groupedRequestsById) {
workManagerScheduler.submit(SyncNotificationWorkManagerRequest(sessionId, requests))
}
} else {
val sessionIds = groupedRequestsById.keys
for (sessionId in sessionIds) {
val requests = groupedRequestsById[sessionId].orEmpty()
Timber.d("Fetching notifications for $sessionId: $requests. Pending requests: ${!requestQueue.isEmpty}")
// Resolving the events in parallel should improve performance since each session id will query a different Client
launch {
// No need for a Mutex since the SDK already has one internally
val notifications = notifiableEventResolver.resolveEvents(sessionId, requests).getOrNull().orEmpty()
results.emit(requests to notifications)
}
}
}
}
}
}
data class NotificationEventRequest(
val sessionId: SessionId,
val roomId: RoomId,
val eventId: EventId,
val providerInfo: String,
)

View file

@ -16,7 +16,11 @@ import io.element.android.features.call.api.ElementCallEntryPoint
import io.element.android.libraries.core.log.logger.LoggerTag
import io.element.android.libraries.core.meta.BuildMeta
import io.element.android.libraries.di.annotations.AppCoroutineScope
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.matrix.api.exception.NotificationResolverException
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.history.PushHistoryService
import io.element.android.libraries.push.impl.history.onDiagnosticPush
import io.element.android.libraries.push.impl.history.onInvalidPushReceived
@ -24,7 +28,6 @@ import io.element.android.libraries.push.impl.history.onSuccess
import io.element.android.libraries.push.impl.history.onUnableToResolveEvent
import io.element.android.libraries.push.impl.history.onUnableToRetrieveSession
import io.element.android.libraries.push.impl.notifications.FallbackNotificationFactory
import io.element.android.libraries.push.impl.notifications.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.push.impl.notifications.channels.NotificationChannels
import io.element.android.libraries.push.impl.notifications.model.FallbackNotifiableEvent
@ -65,6 +68,8 @@ class DefaultPushHandler(
@AppCoroutineScope
private val appCoroutineScope: CoroutineScope,
private val fallbackNotificationFactory: FallbackNotificationFactory,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
private val featureFlagService: FeatureFlagService,
) : PushHandler {
init {
processPushEventResults()
@ -196,6 +201,10 @@ class DefaultPushHandler(
if (nonRingingCallEvents.isNotEmpty()) {
onNotifiableEventReceived.onNotifiableEventsReceived(nonRingingCallEvents)
}
if (!featureFlagService.isFeatureEnabled(FeatureFlags.SyncNotificationsWithWorkManager)) {
syncOnNotifiableEvent(requests)
}
}
.launchIn(appCoroutineScope)
}

View file

@ -7,41 +7,45 @@
package io.element.android.libraries.push.impl.push
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import dev.zacsweers.metro.Inject
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.matrix.api.MatrixClientProvider
import io.element.android.libraries.push.impl.notifications.model.NotifiableEvent
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.services.appnavstate.api.AppForegroundStateService
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import timber.log.Timber
import kotlin.time.Duration.Companion.seconds
@ContributesBinding(AppScope::class)
@Inject
class SyncOnNotifiableEvent(
class DefaultSyncOnNotifiableEvent(
private val matrixClientProvider: MatrixClientProvider,
private val featureFlagService: FeatureFlagService,
private val appForegroundStateService: AppForegroundStateService,
private val dispatchers: CoroutineDispatchers,
) {
suspend operator fun invoke(notifiableEvents: List<NotifiableEvent>) = withContext(dispatchers.io) {
) : SyncOnNotifiableEvent {
override suspend operator fun invoke(requests: List<NotificationEventRequest>) = withContext(dispatchers.io) {
if (!featureFlagService.isFeatureEnabled(FeatureFlags.SyncOnPush)) {
return@withContext
}
try {
val eventsBySession = notifiableEvents.groupBy { it.sessionId }
val eventsBySession = requests.groupBy { it.sessionId }
appForegroundStateService.updateIsSyncingNotificationEvent(true)
Timber.d("Starting opportunistic room list sync | In foreground: ${appForegroundStateService.isInForeground.value}")
for ((sessionId, events) in eventsBySession) {
val client = matrixClientProvider.getOrRestore(sessionId).getOrNull() ?: continue
val eventsByRoomId = events.groupBy { it.roomId }
val roomIds = events.map { it.roomId }.distinct()
client.roomListService.subscribeToVisibleRooms(eventsByRoomId.keys.toList())
client.roomListService.subscribeToVisibleRooms(roomIds)
if (!appForegroundStateService.isInForeground.value) {
// Give the sync some time to complete in background

View file

@ -27,11 +27,9 @@ class DefaultOnNotifiableEventReceived(
private val defaultNotificationDrawerManager: DefaultNotificationDrawerManager,
@AppCoroutineScope
private val coroutineScope: CoroutineScope,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
) : OnNotifiableEventReceived {
override fun onNotifiableEventsReceived(notifiableEvents: List<NotifiableEvent>) {
coroutineScope.launch {
launch { syncOnNotifiableEvent(notifiableEvents) }
defaultNotificationDrawerManager.onNotifiableEventsReceived(notifiableEvents.filter { it !is NotifiableRingingCallEvent })
}
}

View file

@ -0,0 +1,123 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.Assisted
import dev.zacsweers.metro.AssistedFactory
import dev.zacsweers.metro.AssistedInject
import dev.zacsweers.metro.ContributesIntoMap
import dev.zacsweers.metro.binding
import io.element.android.features.networkmonitor.api.NetworkMonitor
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.core.extensions.runCatchingExceptions
import io.element.android.libraries.di.annotations.ApplicationContext
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.notifications.NotifiableEventResolver
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import io.element.android.libraries.workmanager.api.di.MetroWorkerFactory
import io.element.android.libraries.workmanager.api.di.WorkerKey
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.serialization.json.Json
import timber.log.Timber
import kotlin.time.Duration.Companion.seconds
@AssistedInject
class FetchNotificationsWorker(
@Assisted workerParams: WorkerParameters,
@ApplicationContext context: Context,
private val networkMonitor: NetworkMonitor,
private val eventResolver: NotifiableEventResolver,
private val queue: NotificationResolverQueue,
private val workManagerScheduler: WorkManagerScheduler,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
private val coroutineDispatchers: CoroutineDispatchers,
private val json: Json,
) : CoroutineWorker(context, workerParams) {
override suspend fun doWork(): Result = withContext(coroutineDispatchers.io) {
Timber.d("FetchNotificationsWorker started")
val rawRequestsJson = inputData.getString("requests") ?: return@withContext Result.failure()
val requests = runCatchingExceptions {
json.decodeFromString<List<SyncNotificationWorkManagerRequest.Data>>(rawRequestsJson).map { it.toRequest() }
}.getOrElse {
Timber.e(it, "Failed to deserialize notification requests")
return@withContext Result.failure()
}
Timber.d("Deserialized ${requests.size} requests")
// Wait for network to be available, but not more than 10 seconds
val hasNetwork = withTimeoutOrNull(10.seconds) {
networkMonitor.connectivity.first { it == NetworkStatus.Connected }
} != null
if (!hasNetwork) {
Timber.w("No network, retrying later")
return@withContext Result.retry()
}
val failedSyncForSessions = mutableSetOf<SessionId>()
val groupedRequests = requests.groupBy { it.sessionId }
for ((sessionId, notificationRequests) in groupedRequests) {
Timber.d("Processing notification requests for session $sessionId")
eventResolver.resolveEvents(sessionId, notificationRequests)
.fold(
onSuccess = { result ->
// Update the resolved results in the queue
(queue.results as MutableSharedFlow).emit(requests to result)
},
onFailure = {
failedSyncForSessions += sessionId
Timber.e(it, "Failed to resolve notification events for session $sessionId")
}
)
}
// If there were failures for whole sessions, we retry all their requests
if (failedSyncForSessions.isNotEmpty()) {
for (failedSessionId in failedSyncForSessions) {
val requestsToRetry = groupedRequests[failedSessionId] ?: continue
Timber.d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId")
workManagerScheduler.submit(SyncNotificationWorkManagerRequest(failedSessionId, requestsToRetry))
}
}
Timber.d("Notifications processed successfully")
performOpportunisticSyncIfNeeded(groupedRequests)
Result.success()
}
private suspend fun performOpportunisticSyncIfNeeded(
groupedRequests: Map<SessionId, List<NotificationEventRequest>>,
) {
for ((sessionId, notificationRequests) in groupedRequests) {
runCatchingExceptions {
syncOnNotifiableEvent(notificationRequests)
}.onFailure {
Timber.e(it, "Failed to sync on notifiable events for session $sessionId")
}
}
}
@ContributesIntoMap(AppScope::class, binding = binding<MetroWorkerFactory.WorkerInstanceFactory<*>>())
@WorkerKey(FetchNotificationsWorker::class)
@AssistedFactory
abstract class Factory : MetroWorkerFactory.WorkerInstanceFactory<FetchNotificationsWorker>
}

View file

@ -0,0 +1,87 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.OutOfQuotaPolicy
import androidx.work.WorkRequest
import androidx.work.workDataOf
import io.element.android.libraries.core.extensions.runCatchingExceptions
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.workmanager.api.WorkManagerRequest
import io.element.android.libraries.workmanager.api.WorkManagerRequestType
import io.element.android.libraries.workmanager.api.workManagerTag
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import timber.log.Timber
import java.security.InvalidParameterException
class SyncNotificationWorkManagerRequest(
private val sessionId: SessionId,
private val notificationEventRequests: List<NotificationEventRequest>,
) : WorkManagerRequest {
private val json = Json { ignoreUnknownKeys = true }
override fun build(): Result<WorkRequest> {
if (notificationEventRequests.isEmpty()) {
return Result.failure(InvalidParameterException("notificationEventRequests cannot be empty"))
}
val json = runCatchingExceptions { json.encodeToString(notificationEventRequests.map { it.toData() }) }
.getOrElse {
Timber.e(it, "Failed to serialize notification requests")
return Result.failure(it)
}
Timber.d("Scheduling ${notificationEventRequests.size} notification requests with WorkManager for $sessionId")
return Result.success(
OneTimeWorkRequestBuilder<FetchNotificationsWorker>()
.setInputData(workDataOf("requests" to json))
.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
.setTraceTag(workManagerTag(sessionId, WorkManagerRequestType.NOTIFICATION_SYNC))
// TODO investigate using this instead of the resolver queue
// .setInputMerger()
.build()
)
}
@Serializable
data class Data(
@SerialName("session_id")
val sessionId: String,
@SerialName("room_id")
val roomId: String,
@SerialName("event_id")
val eventId: String,
@SerialName("provider_info")
val providerInfo: String,
) {
fun toRequest(): NotificationEventRequest {
return NotificationEventRequest(
sessionId = SessionId(sessionId),
roomId = RoomId(roomId),
eventId = EventId(eventId),
providerInfo = providerInfo,
)
}
}
}
private fun NotificationEventRequest.toData(): SyncNotificationWorkManagerRequest.Data {
return SyncNotificationWorkManagerRequest.Data(
sessionId = sessionId.value,
roomId = roomId.value,
eventId = eventId.value,
providerInfo = providerInfo,
)
}