Merge branch 'develop' into valere/rtc/voice_call

This commit is contained in:
Valere 2026-03-04 13:46:54 +01:00
commit 0e3722e52e
187 changed files with 2612 additions and 2314 deletions

View file

@ -14,13 +14,16 @@ import android.os.PowerManager
import androidx.core.content.getSystemService
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import io.element.android.libraries.core.extensions.runCatchingExceptions
import io.element.android.libraries.di.annotations.ApplicationContext
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.impl.PushDatabase
import io.element.android.libraries.push.impl.db.PushHistory
import io.element.android.libraries.push.impl.db.PushRequest
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlin.time.Instant
@ContributesBinding(AppScope::class)
class DefaultPushHistoryService(
@ -31,7 +34,37 @@ class DefaultPushHistoryService(
private val powerManager = context.getSystemService<PowerManager>()
private val packageName = context.packageName
override fun onPushReceived(
override suspend fun insertOrUpdatePushRequest(pushRequest: PushRequest): Result<Unit> {
return runCatchingExceptions { pushDatabase.pushRequestQueries.insertPushRequest(pushRequest).await() }
}
override suspend fun insertOrUpdatePushRequests(pushRequests: List<PushRequest>): Result<Unit> {
return runCatchingExceptions {
pushDatabase.transaction {
for (request in pushRequests) {
pushDatabase.pushRequestQueries.insertPushRequest(request)
}
}
}
}
override suspend fun getPendingPushRequests(sessionId: SessionId, since: Instant?): Result<List<PushRequest>> {
return runCatchingExceptions {
pushDatabase.transactionWithResult {
val sinceTimeMillis = since?.toEpochMilliseconds() ?: 0
pushDatabase.pushRequestQueries.selectAllPendingForSession(sessionId.value, sinceTimeMillis).executeAsList()
}
}
}
override suspend fun removeOldPushRequests(sessionId: SessionId): Result<Unit> {
return runCatchingExceptions {
val keepAmount = 100L
pushDatabase.pushRequestQueries.removeOldest(keepAmount)
}
}
override fun onPushResult(
providerInfo: String,
eventId: EventId?,
roomId: RoomId?,

View file

@ -11,13 +11,16 @@ package io.element.android.libraries.push.impl.history
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.impl.db.PushRequest
import io.element.android.libraries.push.impl.push.PushRequestStatus
import kotlin.time.Instant
interface PushHistoryService {
/**
* Create a new push history entry.
* Do not use directly, prefer using the extension functions.
*/
fun onPushReceived(
fun onPushResult(
providerInfo: String,
eventId: EventId?,
roomId: RoomId?,
@ -26,12 +29,33 @@ interface PushHistoryService {
includeDeviceState: Boolean,
comment: String?,
)
/**
* Adds or replaces an existing [PushRequest] in the local database.
*/
suspend fun insertOrUpdatePushRequest(pushRequest: PushRequest): Result<Unit>
/**
* Replace a list of [PushRequest] in the database.
*/
suspend fun insertOrUpdatePushRequests(pushRequests: List<PushRequest>): Result<Unit>
/**
* Gets [PushRequestStatus.PENDING] push requests from the local database for a [SessionId].
* A [since] param can optionally be provided to only return those received after that date.
*/
suspend fun getPendingPushRequests(sessionId: SessionId, since: Instant?): Result<List<PushRequest>>
/**
* Removes the oldest push requests for a [SessionId].
*/
suspend fun removeOldPushRequests(sessionId: SessionId): Result<Unit>
}
fun PushHistoryService.onInvalidPushReceived(
providerInfo: String,
data: String,
) = onPushReceived(
) = onPushResult(
providerInfo = providerInfo,
eventId = null,
roomId = null,
@ -46,7 +70,7 @@ fun PushHistoryService.onUnableToRetrieveSession(
eventId: EventId,
roomId: RoomId,
reason: String,
) = onPushReceived(
) = onPushResult(
providerInfo = providerInfo,
eventId = eventId,
roomId = roomId,
@ -62,7 +86,7 @@ fun PushHistoryService.onUnableToResolveEvent(
roomId: RoomId,
sessionId: SessionId,
reason: String,
) = onPushReceived(
) = onPushResult(
providerInfo = providerInfo,
eventId = eventId,
roomId = roomId,
@ -78,7 +102,7 @@ fun PushHistoryService.onSuccess(
roomId: RoomId,
sessionId: SessionId,
comment: String?,
) = onPushReceived(
) = onPushResult(
providerInfo = providerInfo,
eventId = eventId,
roomId = roomId,
@ -95,7 +119,7 @@ fun PushHistoryService.onSuccess(
fun PushHistoryService.onDiagnosticPush(
providerInfo: String,
) = onPushReceived(
) = onPushResult(
providerInfo = providerInfo,
eventId = null,
roomId = null,

View file

@ -50,8 +50,8 @@ import io.element.android.libraries.matrix.api.timeline.item.event.TextMessageTy
import io.element.android.libraries.matrix.api.timeline.item.event.VideoMessageType
import io.element.android.libraries.matrix.api.timeline.item.event.VoiceMessageType
import io.element.android.libraries.matrix.ui.messages.toPlainText
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.impl.R
import io.element.android.libraries.push.impl.db.PushRequest
import io.element.android.libraries.push.impl.notifications.model.InviteNotifiableEvent
import io.element.android.libraries.push.impl.notifications.model.NotifiableMessageEvent
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
@ -64,10 +64,10 @@ private val loggerTag = LoggerTag("DefaultNotifiableEventResolver", LoggerTag.No
/**
* Result of resolving a batch of push events.
* The outermost [Result] indicates whether the setup to resolve the events was successful.
* The results for each push notification will be a map of [NotificationEventRequest] to [Result] of [ResolvedPushEvent].
* The results for each push notification will be a map of [PushRequest] to [Result] of [ResolvedPushEvent].
* If the resolution of a specific event fails, the innermost [Result] will contain an exception.
*/
typealias ResolvePushEventsResult = Result<Map<NotificationEventRequest, Result<ResolvedPushEvent>>>
typealias ResolvePushEventsResult = Result<Map<PushRequest, Result<ResolvedPushEvent>>>
/**
* The notifiable event resolver is able to create a NotifiableEvent (view model for notifications) from an sdk Event.
@ -78,7 +78,7 @@ typealias ResolvePushEventsResult = Result<Map<NotificationEventRequest, Result<
interface NotifiableEventResolver {
suspend fun resolveEvents(
sessionId: SessionId,
notificationEventRequests: List<NotificationEventRequest>
notificationEventRequests: List<PushRequest>
): ResolvePushEventsResult
}
@ -96,15 +96,15 @@ class DefaultNotifiableEventResolver(
) : NotifiableEventResolver {
override suspend fun resolveEvents(
sessionId: SessionId,
notificationEventRequests: List<NotificationEventRequest>
notificationEventRequests: List<PushRequest>
): ResolvePushEventsResult {
Timber.d("Queueing notifications: $notificationEventRequests")
val client = matrixClientProvider.getOrRestore(sessionId).getOrElse {
return Result.failure(it)
}
val ids = notificationEventRequests.groupBy { it.roomId }
val ids = notificationEventRequests.groupBy { RoomId(it.roomId) }
.mapValues { (_, requests) ->
requests.map { it.eventId }
requests.map { EventId(it.eventId) }
}
// TODO this notificationData is not always valid at the moment, sometimes the Rust SDK can't fetch the matching event
@ -125,7 +125,7 @@ class DefaultNotifiableEventResolver(
return Result.success(
notificationEventRequests.associate { request ->
val notificationDataResult = notificationDataMap[request.eventId]
val notificationDataResult = notificationDataMap[EventId(request.eventId)]
if (notificationDataResult == null) {
request to Result.failure(NotificationResolverException.UnknownError("No notification data for ${request.roomId} - ${request.eventId}"))
} else {

View file

@ -101,7 +101,7 @@ class DefaultNotificationMediaRepo(
}
}
private fun MediaSource.cachedFile(): File? = mxcTools.mxcUri2FilePath(url)?.let {
private fun MediaSource.cachedFile(): File? = mxcTools.mxcUri2FilePath(safeUrl)?.let {
File("${cacheDir.path}/$CACHE_NOTIFICATION_SUBDIR/$it")
}
}

View file

@ -1,125 +0,0 @@
/*
* Copyright (c) 2025 Element Creations Ltd.
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.notifications
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import dev.zacsweers.metro.SingleIn
import io.element.android.libraries.di.annotations.AppCoroutineScope
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import io.element.android.libraries.push.impl.workmanager.SyncNotificationWorkManagerRequest
import io.element.android.libraries.push.impl.workmanager.SyncNotificationsWorkerDataConverter
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import io.element.android.services.toolbox.api.sdk.BuildVersionSdkIntProvider
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import timber.log.Timber
import kotlin.time.Duration.Companion.milliseconds
interface NotificationResolverQueue {
val results: SharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>>
suspend fun enqueue(request: NotificationEventRequest)
}
/**
* This class is responsible for periodically batching notification requests and resolving them in a single call,
* so that we can avoid having to resolve each notification individually in the SDK.
*/
@OptIn(ExperimentalCoroutinesApi::class)
@SingleIn(AppScope::class)
@ContributesBinding(AppScope::class)
class DefaultNotificationResolverQueue(
private val notifiableEventResolver: NotifiableEventResolver,
@AppCoroutineScope
private val appCoroutineScope: CoroutineScope,
private val workManagerScheduler: WorkManagerScheduler,
private val featureFlagService: FeatureFlagService,
private val workerDataConverter: SyncNotificationsWorkerDataConverter,
private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider,
) : NotificationResolverQueue {
companion object {
private const val BATCH_WINDOW_MS = 250L
}
private val requestQueue = Channel<NotificationEventRequest>(capacity = 100)
private var currentProcessingJob: Job? = null
/**
* A flow that emits pairs of a list of notification event requests and a map of the resolved events.
* The map contains the original request as the key and the resolved event as the value.
*/
override val results = MutableSharedFlow<Pair<List<NotificationEventRequest>, Map<NotificationEventRequest, Result<ResolvedPushEvent>>>>()
/**
* Enqueues a notification event request to be resolved.
* The request will be processed in batches, so it may not be resolved immediately.
*
* @param request The notification event request to enqueue.
*/
override suspend fun enqueue(request: NotificationEventRequest) {
// Cancel previous processing job if it exists, acting as a debounce operation
Timber.d("Cancelling job: $currentProcessingJob")
currentProcessingJob?.cancel()
// Enqueue the request and start a delayed processing job
requestQueue.send(request)
currentProcessingJob = processQueue()
Timber.d("Starting processing job for request: $request")
}
private fun processQueue() = appCoroutineScope.launch(SupervisorJob()) {
delay(BATCH_WINDOW_MS.milliseconds)
// If this job is still active (so this is the latest job), we launch a separate one that won't be cancelled when enqueueing new items
// to process the existing queued items.
appCoroutineScope.launch {
val groupedRequestsById = buildList {
while (!requestQueue.isEmpty) {
requestQueue.receiveCatching().getOrNull()?.let(::add)
}
}.groupBy { it.sessionId }
if (featureFlagService.isFeatureEnabled(FeatureFlags.SyncNotificationsWithWorkManager)) {
for ((sessionId, requests) in groupedRequestsById) {
workManagerScheduler.submit(
SyncNotificationWorkManagerRequest(
sessionId = sessionId,
notificationEventRequests = requests,
workerDataConverter = workerDataConverter,
buildVersionSdkIntProvider = buildVersionSdkIntProvider,
)
)
}
} else {
val sessionIds = groupedRequestsById.keys
for (sessionId in sessionIds) {
val requests = groupedRequestsById[sessionId].orEmpty()
Timber.d("Fetching notifications for $sessionId: $requests. Pending requests: ${!requestQueue.isEmpty}")
// Resolving the events in parallel should improve performance since each session id will query a different Client
launch {
// No need for a Mutex since the SDK already has one internally
val notifications = notifiableEventResolver.resolveEvents(sessionId, requests).getOrNull().orEmpty()
results.emit(requests to notifications)
}
}
}
}
}
}

View file

@ -0,0 +1,243 @@
/*
* Copyright (c) 2026 Element Creations Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.notifications
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import dev.zacsweers.metro.SingleIn
import io.element.android.features.call.api.CallType
import io.element.android.features.call.api.ElementCallEntryPoint
import io.element.android.libraries.di.annotations.AppCoroutineScope
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.exception.NotificationResolverException
import io.element.android.libraries.matrix.api.notification.CallIntent
import io.element.android.libraries.push.impl.db.PushRequest
import io.element.android.libraries.push.impl.history.PushHistoryService
import io.element.android.libraries.push.impl.history.onSuccess
import io.element.android.libraries.push.impl.history.onUnableToResolveEvent
import io.element.android.libraries.push.impl.notifications.channels.NotificationChannels
import io.element.android.libraries.push.impl.notifications.model.FallbackNotifiableEvent
import io.element.android.libraries.push.impl.notifications.model.NotifiableEvent
import io.element.android.libraries.push.impl.notifications.model.NotifiableRingingCallEvent
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import io.element.android.libraries.push.impl.push.MutableBatteryOptimizationStore
import io.element.android.libraries.push.impl.push.OnNotifiableEventReceived
import io.element.android.libraries.push.impl.push.OnRedactedEventReceived
import io.element.android.libraries.push.impl.push.SyncOnNotifiableEvent
import io.element.android.libraries.pushstore.api.UserPushStoreFactory
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import timber.log.Timber
private const val TAG = "NotifResultProcessor"
interface NotificationResultProcessor {
suspend fun emit(results: Map<PushRequest, Result<ResolvedPushEvent>>)
fun start()
fun stop()
}
@ContributesBinding(AppScope::class)
@SingleIn(AppScope::class)
class DefaultNotificationResultProcessor(
private val pushHistoryService: PushHistoryService,
private val batteryOptimizationStore: MutableBatteryOptimizationStore,
private val fallbackNotificationFactory: FallbackNotificationFactory,
private val userPushStoreFactory: UserPushStoreFactory,
private val onRedactedEventReceived: OnRedactedEventReceived,
private val onNotifiableEventReceived: OnNotifiableEventReceived,
private val featureFlagService: FeatureFlagService,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
private val elementCallEntryPoint: ElementCallEntryPoint,
private val notificationChannels: NotificationChannels,
@AppCoroutineScope private val coroutineScope: CoroutineScope,
) : NotificationResultProcessor {
private val resultFlow = MutableSharedFlow<Map<PushRequest, Result<ResolvedPushEvent>>>(extraBufferCapacity = Int.MAX_VALUE)
private var processJob: Job? = null
override suspend fun emit(results: Map<PushRequest, Result<ResolvedPushEvent>>) {
resultFlow.emit(results)
}
override fun start() {
if (processJob?.isActive == true) {
Timber.tag(TAG).w("Is already processing, not starting again")
return
}
processJob = resultFlow
.onEach(::processResults)
.launchIn(coroutineScope)
}
override fun stop() {
if (processJob?.isActive != true) {
Timber.tag(TAG).w("Is not processing, not stopping")
return
}
processJob?.cancel()
processJob = null
}
private suspend fun processResults(results: Map<PushRequest, Result<ResolvedPushEvent>>) {
// TODO what happens with items that weren't reported back?
for ((request, result) in results) {
result.fold(
onSuccess = {
if (it is ResolvedPushEvent.Event && it.notifiableEvent is FallbackNotifiableEvent) {
pushHistoryService.onUnableToResolveEvent(
providerInfo = request.providerInfo,
eventId = EventId(request.eventId),
roomId = RoomId(request.roomId),
sessionId = SessionId(request.sessionId),
reason = it.notifiableEvent.cause.orEmpty(),
)
} else {
pushHistoryService.onSuccess(
providerInfo = request.providerInfo,
eventId = EventId(request.eventId),
roomId = RoomId(request.roomId),
sessionId = SessionId(request.sessionId),
comment = "Push handled successfully",
)
}
},
onFailure = { exception ->
if (exception is NotificationResolverException.EventFilteredOut) {
pushHistoryService.onSuccess(
providerInfo = request.providerInfo,
eventId = EventId(request.eventId),
roomId = RoomId(request.roomId),
sessionId = SessionId(request.sessionId),
comment = "Push handled successfully but notification was filtered out",
)
} else if (exception is NotificationResolverException.EventRedacted) {
pushHistoryService.onSuccess(
providerInfo = request.providerInfo,
eventId = EventId(request.eventId),
roomId = RoomId(request.roomId),
sessionId = SessionId(request.sessionId),
comment = "Push handled successfully but event has been redacted",
)
} else {
val reason = when (exception) {
is NotificationResolverException.EventNotFound -> "Event not found"
else -> "Unknown error: ${exception.message}"
}
pushHistoryService.onUnableToResolveEvent(
providerInfo = request.providerInfo,
eventId = EventId(request.eventId),
roomId = RoomId(request.roomId),
sessionId = SessionId(request.sessionId),
reason = "$reason - Showing fallback notification",
)
batteryOptimizationStore.showBatteryOptimizationBanner()
}
}
)
}
val events = mutableListOf<NotifiableEvent>()
val redactions = mutableListOf<ResolvedPushEvent.Redaction>()
@Suppress("LoopWithTooManyJumpStatements")
for ((request, result) in results) {
val event = result.recover { exception ->
// If the event could not be resolved, we create a fallback notification
when (exception) {
is NotificationResolverException.EventFilteredOut -> {
// Do nothing, we don't want to show a notification for filtered out events
null
}
is NotificationResolverException.EventRedacted -> {
// Do nothing, we don't want to show a notification for redacted events
null
}
else -> {
Timber.tag(TAG).e(exception, "Failed to resolve push event")
ResolvedPushEvent.Event(
fallbackNotificationFactory.create(
sessionId = SessionId(request.sessionId),
roomId = RoomId(request.roomId),
eventId = EventId(request.eventId),
cause = exception.message,
)
)
}
}
}.getOrNull() ?: continue
val userPushStore = userPushStoreFactory.getOrCreate(event.sessionId)
val areNotificationsEnabled = userPushStore.getNotificationEnabledForDevice().first()
// If notifications are disabled for this session and device, we don't want to show the notification
// But if it's a ringing call, we want to show it anyway
val isRingingCall = (event as? ResolvedPushEvent.Event)?.notifiableEvent is NotifiableRingingCallEvent
if (!areNotificationsEnabled && !isRingingCall) continue
// We categorise each result into either a NotifiableEvent or a Redaction
when (event) {
is ResolvedPushEvent.Event -> {
events.add(event.notifiableEvent)
}
is ResolvedPushEvent.Redaction -> {
redactions.add(event)
}
}
}
// Process redactions of messages in background to not block operations with higher priority
if (redactions.isNotEmpty()) {
coroutineScope.launch { onRedactedEventReceived.onRedactedEventsReceived(redactions) }
}
// Find and process ringing call notifications separately
val (ringingCallEvents, nonRingingCallEvents) = events.partition { it is NotifiableRingingCallEvent }
for (ringingCallEvent in ringingCallEvents) {
Timber.tag(TAG).d("Ringing call event: $ringingCallEvent")
handleRingingCallEvent(ringingCallEvent as NotifiableRingingCallEvent)
}
// Finally, process other notifications (messages, invites, generic notifications, etc.)
if (nonRingingCallEvents.isNotEmpty()) {
onNotifiableEventReceived.onNotifiableEventsReceived(nonRingingCallEvents)
}
if (!featureFlagService.isFeatureEnabled(FeatureFlags.SyncNotificationsWithWorkManager)) {
syncOnNotifiableEvent(results.keys.toList())
}
}
private suspend fun handleRingingCallEvent(notifiableEvent: NotifiableRingingCallEvent) {
Timber.i("## handleInternal() : Incoming call.")
elementCallEntryPoint.handleIncomingCall(
callType = CallType.RoomCall(
notifiableEvent.sessionId,
notifiableEvent.roomId,
isAudioCall = notifiableEvent.callIntent == CallIntent.AUDIO
),
eventId = notifiableEvent.eventId,
senderId = notifiableEvent.senderId,
roomName = notifiableEvent.roomName,
senderName = notifiableEvent.senderDisambiguatedDisplayName,
avatarUrl = notifiableEvent.roomAvatarUrl,
timestamp = notifiableEvent.timestamp,
expirationTimestamp = notifiableEvent.expirationTimestamp,
notificationChannelId = notificationChannels.getChannelForIncomingCall(ring = true),
textContent = notifiableEvent.description,
)
}
}

View file

@ -11,43 +11,28 @@ package io.element.android.libraries.push.impl.push
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesBinding
import dev.zacsweers.metro.SingleIn
import io.element.android.features.call.api.CallType
import io.element.android.features.call.api.ElementCallEntryPoint
import io.element.android.libraries.core.log.logger.LoggerTag
import io.element.android.libraries.core.meta.BuildMeta
import io.element.android.libraries.di.annotations.AppCoroutineScope
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.matrix.api.exception.NotificationResolverException
import io.element.android.libraries.matrix.api.notification.CallIntent
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.db.PushRequest
import io.element.android.libraries.push.impl.history.PushHistoryService
import io.element.android.libraries.push.impl.history.onDiagnosticPush
import io.element.android.libraries.push.impl.history.onInvalidPushReceived
import io.element.android.libraries.push.impl.history.onSuccess
import io.element.android.libraries.push.impl.history.onUnableToResolveEvent
import io.element.android.libraries.push.impl.history.onUnableToRetrieveSession
import io.element.android.libraries.push.impl.notifications.FallbackNotificationFactory
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.push.impl.notifications.channels.NotificationChannels
import io.element.android.libraries.push.impl.notifications.model.FallbackNotifiableEvent
import io.element.android.libraries.push.impl.notifications.model.NotifiableEvent
import io.element.android.libraries.push.impl.notifications.model.NotifiableRingingCallEvent
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import io.element.android.libraries.push.impl.notifications.NotificationResultProcessor
import io.element.android.libraries.push.impl.test.DefaultTestPush
import io.element.android.libraries.push.impl.troubleshoot.DiagnosticPushHandler
import io.element.android.libraries.push.impl.workmanager.SyncPendingNotificationsRequestBuilder
import io.element.android.libraries.pushproviders.api.PushData
import io.element.android.libraries.pushproviders.api.PushHandler
import io.element.android.libraries.pushstore.api.UserPushStoreFactory
import io.element.android.libraries.pushstore.api.clientsecret.PushClientSecret
import io.element.android.libraries.workmanager.api.WorkManagerRequestType
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import io.element.android.services.analytics.api.AnalyticsLongRunningTransaction
import io.element.android.services.analytics.api.AnalyticsService
import kotlinx.coroutines.CoroutineScope
import io.element.android.services.toolbox.api.sdk.BuildVersionSdkIntProvider
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import timber.log.Timber
private val loggerTag = LoggerTag("PushHandler", LoggerTag.PushLoggerTag)
@ -55,173 +40,20 @@ private val loggerTag = LoggerTag("PushHandler", LoggerTag.PushLoggerTag)
@SingleIn(AppScope::class)
@ContributesBinding(AppScope::class)
class DefaultPushHandler(
private val onNotifiableEventReceived: OnNotifiableEventReceived,
private val onRedactedEventReceived: OnRedactedEventReceived,
private val incrementPushDataStore: IncrementPushDataStore,
private val mutableBatteryOptimizationStore: MutableBatteryOptimizationStore,
private val userPushStoreFactory: UserPushStoreFactory,
private val pushClientSecret: PushClientSecret,
private val buildMeta: BuildMeta,
private val diagnosticPushHandler: DiagnosticPushHandler,
private val elementCallEntryPoint: ElementCallEntryPoint,
private val notificationChannels: NotificationChannels,
private val pushHistoryService: PushHistoryService,
private val resolverQueue: NotificationResolverQueue,
@AppCoroutineScope
private val appCoroutineScope: CoroutineScope,
private val fallbackNotificationFactory: FallbackNotificationFactory,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
private val featureFlagService: FeatureFlagService,
private val userPushStoreFactory: UserPushStoreFactory,
private val analyticsService: AnalyticsService,
private val systemClock: SystemClock,
private val workManagerScheduler: WorkManagerScheduler,
private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider,
resultProcessor: NotificationResultProcessor,
) : PushHandler {
init {
processPushEventResults()
}
/**
* Process the push notification event results emitted by the [resolverQueue].
*/
private fun processPushEventResults() {
resolverQueue.results
.map { (requests, resolvedEvents) ->
for (request in requests) {
// Log the result of the push notification event
val result = resolvedEvents[request]
if (result == null) {
pushHistoryService.onUnableToResolveEvent(
providerInfo = request.providerInfo,
eventId = request.eventId,
roomId = request.roomId,
sessionId = request.sessionId,
reason = "Push not handled: no result found for request",
)
} else {
result.fold(
onSuccess = {
if (it is ResolvedPushEvent.Event && it.notifiableEvent is FallbackNotifiableEvent) {
pushHistoryService.onUnableToResolveEvent(
providerInfo = request.providerInfo,
eventId = request.eventId,
roomId = request.roomId,
sessionId = request.sessionId,
reason = it.notifiableEvent.cause.orEmpty(),
)
} else {
pushHistoryService.onSuccess(
providerInfo = request.providerInfo,
eventId = request.eventId,
roomId = request.roomId,
sessionId = request.sessionId,
comment = "Push handled successfully",
)
}
},
onFailure = { exception ->
if (exception is NotificationResolverException.EventFilteredOut) {
pushHistoryService.onSuccess(
providerInfo = request.providerInfo,
eventId = request.eventId,
roomId = request.roomId,
sessionId = request.sessionId,
comment = "Push handled successfully but notification was filtered out",
)
} else if (exception is NotificationResolverException.EventRedacted) {
pushHistoryService.onSuccess(
providerInfo = request.providerInfo,
eventId = request.eventId,
roomId = request.roomId,
sessionId = request.sessionId,
comment = "Push handled successfully but event has been redacted",
)
} else {
val reason = when (exception) {
is NotificationResolverException.EventNotFound -> "Event not found"
else -> "Unknown error: ${exception.message}"
}
pushHistoryService.onUnableToResolveEvent(
providerInfo = request.providerInfo,
eventId = request.eventId,
roomId = request.roomId,
sessionId = request.sessionId,
reason = "$reason - Showing fallback notification",
)
mutableBatteryOptimizationStore.showBatteryOptimizationBanner()
}
}
)
}
}
val events = mutableListOf<NotifiableEvent>()
val redactions = mutableListOf<ResolvedPushEvent.Redaction>()
@Suppress("LoopWithTooManyJumpStatements")
for ((request, result) in resolvedEvents) {
val event = result.recover { exception ->
// If the event could not be resolved, we create a fallback notification
when (exception) {
is NotificationResolverException.EventFilteredOut -> {
// Do nothing, we don't want to show a notification for filtered out events
null
}
is NotificationResolverException.EventRedacted -> {
// Do nothing, we don't want to show a notification for redacted events
null
}
else -> {
Timber.tag(loggerTag.value).e(exception, "Failed to resolve push event")
ResolvedPushEvent.Event(
fallbackNotificationFactory.create(
sessionId = request.sessionId,
roomId = request.roomId,
eventId = request.eventId,
cause = exception.message,
)
)
}
}
}.getOrNull() ?: continue
val userPushStore = userPushStoreFactory.getOrCreate(event.sessionId)
val areNotificationsEnabled = userPushStore.getNotificationEnabledForDevice().first()
// If notifications are disabled for this session and device, we don't want to show the notification
// But if it's a ringing call, we want to show it anyway
val isRingingCall = (event as? ResolvedPushEvent.Event)?.notifiableEvent is NotifiableRingingCallEvent
if (!areNotificationsEnabled && !isRingingCall) continue
// We categorise each result into either a NotifiableEvent or a Redaction
when (event) {
is ResolvedPushEvent.Event -> {
events.add(event.notifiableEvent)
}
is ResolvedPushEvent.Redaction -> {
redactions.add(event)
}
}
}
// Process redactions of messages in background to not block operations with higher priority
if (redactions.isNotEmpty()) {
appCoroutineScope.launch { onRedactedEventReceived.onRedactedEventsReceived(redactions) }
}
// Find and process ringing call notifications separately
val (ringingCallEvents, nonRingingCallEvents) = events.partition { it is NotifiableRingingCallEvent }
for (ringingCallEvent in ringingCallEvents) {
Timber.tag(loggerTag.value).d("Ringing call event: $ringingCallEvent")
handleRingingCallEvent(ringingCallEvent as NotifiableRingingCallEvent)
}
// Finally, process other notifications (messages, invites, generic notifications, etc.)
if (nonRingingCallEvents.isNotEmpty()) {
onNotifiableEventReceived.onNotifiableEventsReceived(nonRingingCallEvents)
}
if (!featureFlagService.isFeatureEnabled(FeatureFlags.SyncNotificationsWithWorkManager)) {
syncOnNotifiableEvent(requests)
}
}
.launchIn(appCoroutineScope)
resultProcessor.start()
}
/**
@ -234,9 +66,7 @@ class DefaultPushHandler(
// Start measuring how long it takes to display a notification from when the push is received
Timber.d("Calculating push-to-notification for event ${pushData.eventId}")
val parent = analyticsService.startLongRunningTransaction(AnalyticsLongRunningTransaction.PushToNotification(pushData.eventId.value))
if (featureFlagService.isFeatureEnabled(FeatureFlags.SyncNotificationsWithWorkManager)) {
analyticsService.startLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(pushData.eventId.value), parent)
}
analyticsService.startLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(pushData.eventId.value), parent)
Timber.tag(loggerTag.value).d("## handling pushData: ${pushData.roomId}/${pushData.eventId}")
if (buildMeta.lowPrivacyLoggingEnabled) {
@ -283,38 +113,56 @@ class DefaultPushHandler(
return
}
appCoroutineScope.launch {
val notificationEventRequest = NotificationEventRequest(
sessionId = userId,
roomId = pushData.roomId,
eventId = pushData.eventId,
providerInfo = providerInfo,
val areNotificationsEnabled = userPushStoreFactory.getOrCreate(userId).getNotificationEnabledForDevice().first()
if (!areNotificationsEnabled) {
Timber.w("Push notification received when push notifications are disabled.")
return
}
val pushRequest = PushRequest(
pushDate = systemClock.epochMillis(),
providerInfo = providerInfo,
eventId = pushData.eventId.value,
roomId = pushData.roomId.value,
sessionId = userId.value,
status = PushRequestStatus.PENDING.value,
retries = 0L,
)
Timber.d("Queueing notification: $pushRequest")
pushHistoryService.insertOrUpdatePushRequest(pushRequest)
if (!workManagerScheduler.hasPendingWork(userId, WorkManagerRequestType.NOTIFICATION_SYNC)) {
Timber.d("No pending worker for push notifications found")
workManagerScheduler.submit(
SyncPendingNotificationsRequestBuilder(
sessionId = userId,
buildVersionSdkIntProvider = buildVersionSdkIntProvider,
)
)
Timber.d("Queueing notification: $notificationEventRequest")
resolverQueue.enqueue(notificationEventRequest)
}
} catch (e: Exception) {
Timber.tag(loggerTag.value).e(e, "## handleInternal() failed")
}
}
private suspend fun handleRingingCallEvent(notifiableEvent: NotifiableRingingCallEvent) {
Timber.i("## handleInternal() : Incoming call.")
elementCallEntryPoint.handleIncomingCall(
callType = CallType.RoomCall(
notifiableEvent.sessionId,
notifiableEvent.roomId,
isAudioCall = notifiableEvent.callIntent == CallIntent.AUDIO
),
eventId = notifiableEvent.eventId,
senderId = notifiableEvent.senderId,
roomName = notifiableEvent.roomName,
senderName = notifiableEvent.senderDisambiguatedDisplayName,
avatarUrl = notifiableEvent.roomAvatarUrl,
timestamp = notifiableEvent.timestamp,
expirationTimestamp = notifiableEvent.expirationTimestamp,
notificationChannelId = notificationChannels.getChannelForIncomingCall(ring = true),
textContent = notifiableEvent.description,
)
}
}
/**
* Represents the status of a [PushRequest].
*/
enum class PushRequestStatus(val value: Long) {
/**
* Either it was enqueued, and we never tried to fetch it, or it failed with a recoverable error.
*/
PENDING(0),
/**
* The event for the [PushRequest] was fetched successfully.
*/
SUCCESS(1),
/**
* Fetching the event for the [PushRequest] failed with an unrecoverable error, and it won't be retried.
*/
FAILED(2),
}

View file

@ -14,8 +14,9 @@ import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.featureflag.api.FeatureFlagService
import io.element.android.libraries.featureflag.api.FeatureFlags
import io.element.android.libraries.matrix.api.MatrixClientProvider
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.impl.db.PushRequest
import io.element.android.services.appnavstate.api.AppForegroundStateService
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
@ -29,7 +30,7 @@ class DefaultSyncOnNotifiableEvent(
private val appForegroundStateService: AppForegroundStateService,
private val dispatchers: CoroutineDispatchers,
) : SyncOnNotifiableEvent {
override suspend operator fun invoke(requests: List<NotificationEventRequest>) = withContext(dispatchers.io) {
override suspend operator fun invoke(requests: List<PushRequest>) = withContext(dispatchers.io) {
if (!featureFlagService.isFeatureEnabled(FeatureFlags.SyncOnPush)) {
return@withContext
}
@ -41,8 +42,8 @@ class DefaultSyncOnNotifiableEvent(
Timber.d("Starting opportunistic room list sync | In foreground: ${appForegroundStateService.isInForeground.value}")
for ((sessionId, events) in eventsBySession) {
val client = matrixClientProvider.getOrRestore(sessionId).getOrNull() ?: continue
val roomIds = events.map { it.roomId }.distinct()
val client = matrixClientProvider.getOrRestore(SessionId(sessionId)).getOrNull() ?: continue
val roomIds = events.map { RoomId(it.roomId) }.distinct()
client.roomListService.subscribeToVisibleRooms(roomIds)

View file

@ -0,0 +1,14 @@
/*
* Copyright (c) 2026 Element Creations Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.push
import io.element.android.libraries.push.impl.db.PushRequest
fun interface SyncOnNotifiableEvent {
suspend operator fun invoke(requests: List<PushRequest>)
}

View file

@ -8,8 +8,8 @@
package io.element.android.libraries.push.impl.troubleshoot
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.ContributesIntoSet
import io.element.android.libraries.di.SessionScope
import io.element.android.libraries.push.impl.R
import io.element.android.libraries.pushproviders.api.PushProvider
import io.element.android.libraries.troubleshoot.api.test.NotificationTroubleshootTest
@ -19,7 +19,7 @@ import io.element.android.services.toolbox.api.strings.StringProvider
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.StateFlow
@ContributesIntoSet(AppScope::class)
@ContributesIntoSet(SessionScope::class)
class PushProvidersTest(
pushProviders: Set<@JvmSuppressWildcards PushProvider>,
private val stringProvider: StringProvider,

View file

@ -1,191 +0,0 @@
/*
* Copyright (c) 2025 Element Creations Ltd.
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.Assisted
import dev.zacsweers.metro.AssistedFactory
import dev.zacsweers.metro.AssistedInject
import dev.zacsweers.metro.ContributesIntoMap
import dev.zacsweers.metro.binding
import io.element.android.features.networkmonitor.api.NetworkMonitor
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.libraries.core.extensions.runCatchingExceptions
import io.element.android.libraries.di.annotations.ApplicationContext
import io.element.android.libraries.matrix.api.auth.SessionRestorationException
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.push.api.push.SyncOnNotifiableEvent
import io.element.android.libraries.push.impl.notifications.NotifiableEventResolver
import io.element.android.libraries.push.impl.notifications.NotificationResolverQueue
import io.element.android.libraries.workmanager.api.WorkManagerScheduler
import io.element.android.libraries.workmanager.api.di.MetroWorkerFactory
import io.element.android.libraries.workmanager.api.di.WorkerKey
import io.element.android.services.analytics.api.AnalyticsLongRunningTransaction
import io.element.android.services.analytics.api.AnalyticsService
import io.element.android.services.analytics.api.finishLongRunningTransaction
import io.element.android.services.analytics.api.recordTransaction
import io.element.android.services.analyticsproviders.api.AnalyticsTransaction
import io.element.android.services.toolbox.api.sdk.BuildVersionSdkIntProvider
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withTimeoutOrNull
import timber.log.Timber
import kotlin.time.Duration.Companion.seconds
@AssistedInject
class FetchNotificationsWorker(
@Assisted params: WorkerParameters,
@ApplicationContext private val context: Context,
private val networkMonitor: NetworkMonitor,
private val eventResolver: NotifiableEventResolver,
private val queue: NotificationResolverQueue,
private val workManagerScheduler: WorkManagerScheduler,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
private val workerDataConverter: SyncNotificationsWorkerDataConverter,
private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider,
private val analyticsService: AnalyticsService,
) : CoroutineWorker(context, params) {
override suspend fun doWork(): Result {
Timber.d("FetchNotificationsWorker started")
val requests = workerDataConverter.deserialize(inputData) ?: return Result.failure()
val networkTimeoutSpans = requests.mapNotNull { request ->
val parent = analyticsService.getLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(request.eventId.value))
parent?.startChild("Waiting for network connectivity", "await_network")
}
// Wait for network to be available, but not more than 10 seconds
val hasNetwork = withTimeoutOrNull(10.seconds) {
networkMonitor.connectivity.first { it == NetworkStatus.Connected }
} != null
networkTimeoutSpans.finish()
// If there is a problem with the updated network values, report it and retry if needed
if (reportConnectivityError(requests = requests, hasNetwork = hasNetwork, isNetworkBlocked = networkMonitor.isNetworkBlocked())) {
return Result.retry()
}
val pendingAnalyticTransactions = requests.mapNotNull { request ->
analyticsService.finishLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(request.eventId.value))
val parent = analyticsService.getLongRunningTransaction(AnalyticsLongRunningTransaction.PushToNotification(request.eventId.value))
val transactionName = "WorkManager to event fetched"
parent?.startChild(transactionName)?.let { request.eventId to it }
}.toMap()
val failedSyncForSessions = mutableMapOf<SessionId, Throwable>()
val groupedRequests = requests.groupBy { it.sessionId }.toMutableMap()
for ((sessionId, notificationRequests) in groupedRequests) {
Timber.d("Processing notification requests for session $sessionId")
eventResolver.resolveEvents(sessionId, notificationRequests)
.fold(
onSuccess = { result ->
for ((_, transaction) in pendingAnalyticTransactions) {
transaction.finish()
}
// Update the resolved results in the queue
(queue.results as MutableSharedFlow).emit(requests to result)
},
onFailure = {
for ((_, transaction) in pendingAnalyticTransactions) {
transaction.attachError(it)
transaction.finish()
}
failedSyncForSessions[sessionId] = it
Timber.e(it, "Failed to resolve notification events for session $sessionId")
}
)
}
// If there were failures for whole sessions, we retry all their requests
if (failedSyncForSessions.isNotEmpty()) {
@Suppress("LoopWithTooManyJumpStatements")
for ((failedSessionId, exception) in failedSyncForSessions) {
if (exception.cause is SessionRestorationException) {
Timber.e(exception, "Session $failedSessionId could not be restored, not retrying notification fetching")
groupedRequests.remove(failedSessionId)
continue
}
val requestsToRetry = groupedRequests[failedSessionId] ?: continue
for (request in requestsToRetry) {
val failedTransaction = pendingAnalyticTransactions[request.eventId]
failedTransaction?.attachError(exception)
failedTransaction?.finish()
val eventId = request.eventId.value
val parent = analyticsService.getLongRunningTransaction(AnalyticsLongRunningTransaction.PushToNotification(eventId))
// Since we're retrying, start a new transaction
analyticsService.startLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(eventId), parent)
}
Timber.d("Re-scheduling ${requestsToRetry.size} failed notification requests for session $failedSessionId")
workManagerScheduler.submit(
SyncNotificationWorkManagerRequest(
sessionId = failedSessionId,
notificationEventRequests = requestsToRetry,
workerDataConverter = workerDataConverter,
buildVersionSdkIntProvider = buildVersionSdkIntProvider,
)
)
}
}
Timber.d("Notifications processed successfully")
analyticsService.recordTransaction("Opportunistic sync", "opportunistic_sync") {
performOpportunisticSyncIfNeeded(groupedRequests)
}
return Result.success()
}
private fun reportConnectivityError(requests: List<NotificationEventRequest>, hasNetwork: Boolean, isNetworkBlocked: Boolean): Boolean {
return if (!hasNetwork || isNetworkBlocked) {
for (request in requests) {
val eventId = request.eventId.value
analyticsService.finishLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(eventId)) {
it.putExtraData("has_network_connection", hasNetwork.toString())
it.putExtraData("is_network_blocked", isNetworkBlocked.toString())
}
val parent = analyticsService.getLongRunningTransaction(AnalyticsLongRunningTransaction.PushToNotification(eventId))
// Since we're retrying, start a new transaction
analyticsService.startLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(eventId), parent)
}
Timber.w("FetchNotificationsWorker will retry. Has network connectivity: $hasNetwork. Is network blocked: $isNetworkBlocked")
true
} else {
false
}
}
private suspend fun performOpportunisticSyncIfNeeded(
groupedRequests: Map<SessionId, List<NotificationEventRequest>>,
) {
for ((sessionId, notificationRequests) in groupedRequests) {
runCatchingExceptions {
syncOnNotifiableEvent(notificationRequests)
}.onFailure {
Timber.e(it, "Failed to sync on notifiable events for session $sessionId")
}
}
}
@ContributesIntoMap(AppScope::class, binding = binding<MetroWorkerFactory.WorkerInstanceFactory<*>>())
@WorkerKey(FetchNotificationsWorker::class)
@AssistedFactory
interface Factory : MetroWorkerFactory.WorkerInstanceFactory<FetchNotificationsWorker>
}
private fun <T : AnalyticsTransaction> Collection<T>.finish() = forEach { it.finish() }

View file

@ -0,0 +1,239 @@
/*
* Copyright (c) 2025 Element Creations Ltd.
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import dev.zacsweers.metro.AppScope
import dev.zacsweers.metro.Assisted
import dev.zacsweers.metro.AssistedFactory
import dev.zacsweers.metro.AssistedInject
import dev.zacsweers.metro.ContributesIntoMap
import dev.zacsweers.metro.binding
import io.element.android.features.networkmonitor.api.NetworkMonitor
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.libraries.core.extensions.runCatchingExceptions
import io.element.android.libraries.di.annotations.ApplicationContext
import io.element.android.libraries.matrix.api.auth.SessionRestorationException
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.exception.ClientException
import io.element.android.libraries.matrix.api.exception.isNetworkError
import io.element.android.libraries.push.impl.db.PushRequest
import io.element.android.libraries.push.impl.history.PushHistoryService
import io.element.android.libraries.push.impl.notifications.NotifiableEventResolver
import io.element.android.libraries.push.impl.notifications.NotificationResultProcessor
import io.element.android.libraries.push.impl.push.PushRequestStatus
import io.element.android.libraries.push.impl.push.SyncOnNotifiableEvent
import io.element.android.libraries.workmanager.api.di.MetroWorkerFactory
import io.element.android.libraries.workmanager.api.di.WorkerKey
import io.element.android.services.analytics.api.AnalyticsLongRunningTransaction
import io.element.android.services.analytics.api.AnalyticsService
import io.element.android.services.analytics.api.finishLongRunningTransaction
import io.element.android.services.analytics.api.recordTransaction
import io.element.android.services.analyticsproviders.api.AnalyticsTransaction
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withTimeoutOrNull
import timber.log.Timber
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.seconds
import kotlin.time.Instant
@AssistedInject
class FetchPendingNotificationsWorker(
@Assisted private val params: WorkerParameters,
@ApplicationContext private val context: Context,
private val pushHistoryService: PushHistoryService,
private val networkMonitor: NetworkMonitor,
private val eventResolver: NotifiableEventResolver,
private val syncOnNotifiableEvent: SyncOnNotifiableEvent,
private val resultProcessor: NotificationResultProcessor,
private val analyticsService: AnalyticsService,
private val systemClock: SystemClock,
) : CoroutineWorker(context, params) {
override suspend fun doWork(): Result {
Timber.d("FetchNotificationsWorker started")
// RunCatching for test in debug mode
val sessionId = runCatchingExceptions {
inputData.getString(SyncPendingNotificationsRequestBuilder.SESSION_ID)?.let(::SessionId)
}.getOrNull() ?: return Result.failure()
// Fetch pending requests in the last 24 hours
val fetchSince = Instant.fromEpochMilliseconds(systemClock.epochMillis()).minus(1.days)
val requests = pushHistoryService.getPendingPushRequests(sessionId, fetchSince).getOrNull() ?: return Result.failure()
pushHistoryService.removeOldPushRequests(sessionId).onFailure {
Timber.e(it, "Could not remove outdated push requests")
}
if (requests.isEmpty()) {
Timber.d("No pending notifications to fetch, returning early")
return Result.success()
}
checkNetworkConnection(requests)?.let { failure -> return failure }
Timber.d("Fetching ${requests.size} push requests")
val pendingAnalyticTransactions = requests.mapNotNull { request ->
analyticsService.finishLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(request.eventId))
val parent = analyticsService.getLongRunningTransaction(AnalyticsLongRunningTransaction.PushToNotification(request.eventId))
val transactionName = "WorkManager to event fetched"
parent?.startChild(transactionName)?.let { request.eventId to it }
}.toMap()
Timber.d("Processing notification requests for session $sessionId")
val results = eventResolver.resolveEvents(sessionId, requests)
.fold(
onSuccess = { results ->
for ((_, transaction) in pendingAnalyticTransactions) {
transaction.finish()
}
// Update the resolved results in the queue
resultProcessor.emit(results)
results
},
onFailure = {
// This is a failure at the fetch notification setup, not a failure for a single fetch notification operation
return handleSetupError(sessionId, requests, pendingAnalyticTransactions, it)
}
)
val updatedRequests = mutableListOf<PushRequest>()
for (request in requests) {
val result = results[request] ?: continue
result.fold(
onSuccess = { updatedRequests.add(request.copy(status = PushRequestStatus.SUCCESS.value)) },
onFailure = { exception ->
if (exception is ClientException && exception.isNetworkError()) {
// Reset to pending so we can retry it later
updatedRequests.add(request.copy(status = PushRequestStatus.PENDING.value))
} else {
updatedRequests.add(request.copy(status = PushRequestStatus.FAILED.value))
}
}
)
}
Timber.d("Notifications processed successfully")
pushHistoryService.insertOrUpdatePushRequests(updatedRequests)
analyticsService.recordTransaction("Opportunistic sync", "opportunistic_sync") {
performOpportunisticSyncIfNeeded(mapOf(sessionId to requests))
}
return if (updatedRequests.any { it.status == PushRequestStatus.PENDING.value }) Result.retry() else Result.success()
}
private suspend fun performOpportunisticSyncIfNeeded(
groupedRequests: Map<SessionId, List<PushRequest>>,
) {
for ((sessionId, notificationRequests) in groupedRequests) {
runCatchingExceptions {
syncOnNotifiableEvent(notificationRequests)
}.onFailure {
Timber.e(it, "Failed to sync on notifiable events for session $sessionId")
}
}
}
private suspend fun checkNetworkConnection(requests: List<PushRequest>): Result? {
val networkTimeoutSpans = requests.mapNotNull { request ->
val parent = analyticsService.getLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(request.eventId))
parent?.startChild("Waiting for network connectivity", "await_network")
}
// Wait for network to be available, but not more than 10 seconds
val hasNetwork = withTimeoutOrNull(10.seconds) {
networkMonitor.connectivity.first { it == NetworkStatus.Connected }
} != null
networkTimeoutSpans.finish()
// If there is a problem with the updated network values, report it and retry if needed
if (reportConnectivityError(requests = requests, hasNetwork = hasNetwork, isNetworkBlocked = networkMonitor.isNetworkBlocked())) {
pushHistoryService.insertOrUpdatePushRequests(requests.map { request ->
request.copy(retries = request.retries + 1)
})
return Result.retry()
}
return null
}
private fun reportConnectivityError(requests: List<PushRequest>, hasNetwork: Boolean, isNetworkBlocked: Boolean): Boolean {
return if (!hasNetwork || isNetworkBlocked) {
for (request in requests) {
val eventId = request.eventId
analyticsService.finishLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(eventId)) {
it.putExtraData("has_network_connection", hasNetwork.toString())
it.putExtraData("is_network_blocked", isNetworkBlocked.toString())
}
val parent = analyticsService.getLongRunningTransaction(AnalyticsLongRunningTransaction.PushToNotification(eventId))
// Since we're retrying, start a new transaction
analyticsService.startLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(eventId), parent)
}
Timber.w("FetchNotificationsWorker will retry. Has network connectivity: $hasNetwork. Is network blocked: $isNetworkBlocked")
true
} else {
false
}
}
private suspend fun handleSetupError(
sessionId: SessionId,
requests: List<PushRequest>,
pendingAnalyticTransactions: Map<String, AnalyticsTransaction>,
throwable: Throwable,
): Result {
for ((_, transaction) in pendingAnalyticTransactions) {
transaction.attachError(throwable)
transaction.finish()
}
// If there were failures on the setup step and they weren't recoverable, update the requests and fail
if (throwable.cause is SessionRestorationException) {
Timber.e(throwable, "Session $sessionId could not be restored, not retrying notification fetching")
pushHistoryService.insertOrUpdatePushRequests(requests.map { request ->
request.copy(status = PushRequestStatus.FAILED.value)
})
return Result.failure()
}
// If the failure is recoverable, retry
for (request in requests) {
val failedTransaction = pendingAnalyticTransactions[request.eventId]
failedTransaction?.attachError(throwable)
failedTransaction?.finish()
val eventId = request.eventId
val parent = analyticsService.getLongRunningTransaction(AnalyticsLongRunningTransaction.PushToNotification(eventId))
// Since we're retrying, start a new transaction
analyticsService.startLongRunningTransaction(AnalyticsLongRunningTransaction.PushToWorkManager(eventId), parent)
}
Timber.d("Re-scheduling ${requests.size} failed notification requests for session $sessionId")
pushHistoryService.insertOrUpdatePushRequests(requests.map { request ->
request.copy(retries = request.retries + 1)
})
return Result.retry()
}
@ContributesIntoMap(AppScope::class, binding = binding<MetroWorkerFactory.WorkerInstanceFactory<*>>())
@WorkerKey(FetchPendingNotificationsWorker::class)
@AssistedFactory
interface Factory : MetroWorkerFactory.WorkerInstanceFactory<FetchPendingNotificationsWorker>
}
private fun <T : AnalyticsTransaction> Collection<T>.finish() = forEach { it.finish() }

View file

@ -1,68 +0,0 @@
/*
* Copyright (c) 2025 Element Creations Ltd.
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import android.os.Build
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.OutOfQuotaPolicy
import androidx.work.WorkRequest
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.api.push.NotificationEventRequest
import io.element.android.libraries.workmanager.api.WorkManagerRequest
import io.element.android.libraries.workmanager.api.WorkManagerRequestType
import io.element.android.libraries.workmanager.api.workManagerTag
import io.element.android.services.toolbox.api.sdk.BuildVersionSdkIntProvider
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import timber.log.Timber
import java.security.InvalidParameterException
class SyncNotificationWorkManagerRequest(
private val sessionId: SessionId,
private val notificationEventRequests: List<NotificationEventRequest>,
private val workerDataConverter: SyncNotificationsWorkerDataConverter,
private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider,
) : WorkManagerRequest {
override fun build(): Result<List<WorkRequest>> {
if (notificationEventRequests.isEmpty()) {
return Result.failure(InvalidParameterException("notificationEventRequests cannot be empty"))
}
Timber.d("Scheduling ${notificationEventRequests.size} notification requests with WorkManager for $sessionId")
return workerDataConverter.serialize(notificationEventRequests).map { dataList ->
dataList.map { data ->
OneTimeWorkRequestBuilder<FetchNotificationsWorker>()
.setInputData(data)
.apply {
// Expedited workers aren't needed on Android 12 or lower:
// They force displaying a foreground sync notification for no good reason, since they sync almost immediately anyway
// See https://developer.android.com/develop/background-work/background-tasks/persistent/getting-started/define-work#backwards-compat
if (buildVersionSdkIntProvider.isAtLeast(Build.VERSION_CODES.TIRAMISU)) {
setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
}
}
.setTraceTag(workManagerTag(sessionId, WorkManagerRequestType.NOTIFICATION_SYNC))
// TODO investigate using this instead of the resolver queue
// .setInputMerger()
.build()
}
}
}
@Serializable
data class Data(
@SerialName("session_id")
val sessionId: String,
@SerialName("room_id")
val roomId: String,
@SerialName("event_id")
val eventId: String,
@SerialName("provider_info")
val providerInfo: String,
)
}

View file

@ -1,129 +0,0 @@
/*
* Copyright (c) 2025 Element Creations Ltd.
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import androidx.work.Data
import androidx.work.workDataOf
import dev.zacsweers.metro.Inject
import io.element.android.libraries.androidutils.json.JsonProvider
import io.element.android.libraries.core.extensions.mapCatchingExceptions
import io.element.android.libraries.core.extensions.runCatchingExceptions
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.api.push.NotificationEventRequest
import timber.log.Timber
@Inject
class SyncNotificationsWorkerDataConverter(
private val json: JsonProvider,
) {
fun serialize(notificationEventRequests: List<NotificationEventRequest>): Result<List<Data>> {
// First try to serialize all requests at once. In the vast majority of cases this will work.
return serializeRequests(notificationEventRequests)
.map { listOf(it) }
.recoverCatching { t ->
if (t is DataForWorkManagerIsTooBig) {
// Perform serialization on sublists, workDataOf have failed because of size limit
Timber.w(t, "Failed to serialize ${notificationEventRequests.size} notification requests, split the requests per room.")
// Group the requests per rooms
val requestsSortedPerRoom = notificationEventRequests.groupBy { it.roomId }.values
// Build a list of sublist with size at most CHUNK_SIZE, and with all rooms kept together
buildList {
val currentChunk = mutableListOf<NotificationEventRequest>()
for (requests in requestsSortedPerRoom) {
if (currentChunk.size + requests.size <= CHUNK_SIZE) {
// Can add the whole room requests to the current chunk
currentChunk.addAll(requests)
} else {
// Add the current chunk
add(currentChunk.toList())
// Start a new chunk with the current room requests
currentChunk.clear()
// If a room has more requests than CHUNK_SIZE, we need to split them
requests.chunked(CHUNK_SIZE) { chunk ->
if (chunk.size == CHUNK_SIZE) {
add(chunk.toList())
} else {
currentChunk.addAll(chunk)
}
}
}
}
// Add any remaining requests
add(currentChunk.toList())
}
.filter { it.isNotEmpty() }
.also {
Timber.d("Split notification requests into ${it.size} chunks for WorkManager serialization")
it.forEach { requests ->
Timber.d(" - Chunk with ${requests.size} requests")
}
}
.mapNotNull { serializeRequests(it).getOrNull() }
} else {
throw t
}
}
}
private fun serializeRequests(notificationEventRequests: List<NotificationEventRequest>): Result<Data> {
return runCatchingExceptions { json().encodeToString(notificationEventRequests.map { it.toData() }) }
.onFailure {
Timber.e(it, "Failed to serialize notification requests")
}
.mapCatchingExceptions { str ->
// Note: workDataOf can fail if the data is too large
try {
workDataOf(REQUESTS_KEY to str)
} catch (_: IllegalStateException) {
throw DataForWorkManagerIsTooBig()
}
}
}
fun deserialize(data: Data): List<NotificationEventRequest>? {
val rawRequestsJson = data.getString(REQUESTS_KEY) ?: return null
return runCatchingExceptions {
json().decodeFromString<List<SyncNotificationWorkManagerRequest.Data>>(rawRequestsJson).map { it.toRequest() }
}.fold(
onSuccess = {
Timber.d("Deserialized ${it.size} requests")
it
},
onFailure = {
Timber.e(it, "Failed to deserialize notification requests")
null
}
)
}
companion object {
private const val REQUESTS_KEY = "requests"
internal const val CHUNK_SIZE = 20
}
}
private fun NotificationEventRequest.toData(): SyncNotificationWorkManagerRequest.Data {
return SyncNotificationWorkManagerRequest.Data(
sessionId = sessionId.value,
roomId = roomId.value,
eventId = eventId.value,
providerInfo = providerInfo,
)
}
private fun SyncNotificationWorkManagerRequest.Data.toRequest(): NotificationEventRequest {
return NotificationEventRequest(
sessionId = SessionId(sessionId),
roomId = RoomId(roomId),
eventId = EventId(eventId),
providerInfo = providerInfo,
)
}

View file

@ -0,0 +1,51 @@
/*
* Copyright (c) 2025 Element Creations Ltd.
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
* Please see LICENSE files in the repository root for full details.
*/
package io.element.android.libraries.push.impl.workmanager
import android.os.Build
import androidx.work.ExistingWorkPolicy
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.OutOfQuotaPolicy
import androidx.work.workDataOf
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.workmanager.api.WorkManagerRequestBuilder
import io.element.android.libraries.workmanager.api.WorkManagerRequestType
import io.element.android.libraries.workmanager.api.WorkManagerRequestWrapper
import io.element.android.libraries.workmanager.api.WorkManagerWorkerType
import io.element.android.libraries.workmanager.api.workManagerTag
import io.element.android.services.toolbox.api.sdk.BuildVersionSdkIntProvider
class SyncPendingNotificationsRequestBuilder(
private val sessionId: SessionId,
private val buildVersionSdkIntProvider: BuildVersionSdkIntProvider,
) : WorkManagerRequestBuilder {
companion object {
const val SESSION_ID = "session_id"
}
override suspend fun build(): Result<List<WorkManagerRequestWrapper>> {
val type = WorkManagerWorkerType.Unique(
name = workManagerTag(sessionId = sessionId, requestType = WorkManagerRequestType.NOTIFICATION_SYNC),
policy = ExistingWorkPolicy.APPEND_OR_REPLACE,
)
val request = OneTimeWorkRequestBuilder<FetchPendingNotificationsWorker>()
.setInputData(workDataOf(SESSION_ID to sessionId.value))
.apply {
// Expedited workers aren't needed on Android 12 or lower:
// They force displaying a foreground sync notification for no good reason, since they sync almost immediately anyway
// See https://developer.android.com/develop/background-work/background-tasks/persistent/getting-started/define-work#backwards-compat
if (buildVersionSdkIntProvider.isAtLeast(Build.VERSION_CODES.TIRAMISU)) {
setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
}
}
.setTraceTag(workManagerTag(sessionId, WorkManagerRequestType.NOTIFICATION_SYNC))
.build()
return Result.success(listOf(WorkManagerRequestWrapper(request, type)))
}
}

Binary file not shown.

Binary file not shown.

View file

@ -17,6 +17,5 @@ INSERT INTO PushHistory VALUES ?;
removeAll:
DELETE FROM PushHistory;
-- add query to keep only the last x entries
removeOldest:
DELETE FROM PushHistory WHERE rowid NOT IN (SELECT rowid FROM PushHistory ORDER BY pushDate DESC LIMIT ?);

View file

@ -0,0 +1,24 @@
CREATE TABLE PushRequest (
pushDate INTEGER NOT NULL,
providerInfo TEXT NOT NULL,
eventId TEXT NOT NULL,
roomId TEXT NOT NULL,
sessionId TEXT NOT NULL,
status INTEGER NOT NULL DEFAULT 0,
retries INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY(sessionId, eventId)
);
CREATE INDEX PushRequestSessionAndStatus ON PushRequest (sessionId, status);
selectAllPendingForSession:
SELECT * FROM PushRequest WHERE status = 0 AND sessionId = ? AND pushDate > ? ORDER BY pushDate ASC;
insertPushRequest:
INSERT OR REPLACE INTO PushRequest VALUES ?;
removeAll:
DELETE FROM PushRequest;
removeOldest:
DELETE FROM PushRequest WHERE rowid NOT IN (SELECT rowid FROM PushRequest ORDER BY pushDate DESC LIMIT ?);

View file

@ -0,0 +1,14 @@
-- Migrate DB from version 1
CREATE TABLE PushRequest (
pushDate INTEGER NOT NULL,
providerInfo TEXT NOT NULL,
eventId TEXT NOT NULL,
roomId TEXT NOT NULL,
sessionId TEXT NOT NULL,
status INTEGER NOT NULL DEFAULT 0,
retries INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY(sessionId, eventId)
);
CREATE INDEX PushRequestSessionAndStatus ON PushRequest (sessionId, status);