From 2cebb4fb5d2ae5770a2f4bfbe41f2af09d799505 Mon Sep 17 00:00:00 2001 From: Kayos Date: Wed, 11 Mar 2026 10:39:28 -0700 Subject: [PATCH] v6: retry indefinitely with backoff, never drop detections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove 5-attempt drop limit — retry forever until success - Exponential backoff for retries: 5s → 10s → 20s → 40s → 60s (capped) - Only drop detections if queue exceeds 2000 (memory protection) - Drop oldest first when over limit (FIFO eviction) - Show retry countdown in error message - Reset backoff on successful send Handles intermittent 502s from ADAMaps gracefully by holding queue until server recovers. --- .../varroa/service/ForwardingService.kt | 112 ++++++++++++------ 1 file changed, 77 insertions(+), 35 deletions(-) diff --git a/app/src/main/java/com/adamaps/varroa/service/ForwardingService.kt b/app/src/main/java/com/adamaps/varroa/service/ForwardingService.kt index a3b2d1e..1555984 100644 --- a/app/src/main/java/com/adamaps/varroa/service/ForwardingService.kt +++ b/app/src/main/java/com/adamaps/varroa/service/ForwardingService.kt @@ -41,11 +41,16 @@ class ForwardingService : LifecycleService() { const val ACTION_START = "com.adamaps.varroa.START_FORWARDING" const val ACTION_STOP = "com.adamaps.varroa.STOP_FORWARDING" - // Exponential backoff constants + // Exponential backoff constants (Bee polling) private const val MIN_BACKOFF_MS = 5_000L // 5 seconds private const val MAX_BACKOFF_MS = 60_000L // 60 seconds private const val BACKOFF_MULTIPLIER = 2.0 + // ADAMaps retry queue constants + private const val RETRY_MIN_BACKOFF_MS = 5_000L // 5 seconds + private const val RETRY_MAX_BACKOFF_MS = 60_000L // 60 seconds (cap) + private const val MAX_QUEUE_SIZE = 2000 // Max detections to hold in memory + // Shared state exposed to ViewModels private val _stats = MutableStateFlow(SessionStats()) val stats: StateFlow = _stats.asStateFlow() @@ -75,17 +80,18 @@ class ForwardingService : LifecycleService() { private var reachJob: Job? = null private var retryJob: Job? = null - // Exponential backoff state + // Exponential backoff state (Bee polling) private var currentBackoffMs = MIN_BACKOFF_MS private var consecutiveFailures = 0 // Retry queue for failed ADAMaps sends private val pendingQueue = ConcurrentLinkedQueue() + private var retryBackoffMs = RETRY_MIN_BACKOFF_MS + private var retryConsecutiveFailures = 0 private data class QueuedIngest( val deviceId: String, - val detections: List, - var attempts: Int = 0 + val detections: List ) override fun onCreate() { @@ -173,7 +179,8 @@ class ForwardingService : LifecycleService() { retryJob?.cancel() retryJob = lifecycleScope.launch { while (true) { - delay(10_000L) // Check retry queue every 10 seconds + // Use exponential backoff: 5s → 10s → 20s → 40s → 60s (capped) + delay(retryBackoffMs) processRetryQueue() } } @@ -256,51 +263,86 @@ class ForwardingService : LifecycleService() { is ApiResult.Success -> { _stats.update { it.copy(sent = it.sent + detections.size, queued = it.queued - detections.size) } _adamapsReachable.value = true + // Reset retry backoff on direct success + retryConsecutiveFailures = 0 + retryBackoffMs = RETRY_MIN_BACKOFF_MS } is ApiResult.Error -> { _lastError.value = "ADAMaps: ${result.message}" _adamapsReachable.value = false // Add to retry queue instead of losing detections pendingQueue.add(QueuedIngest(deviceId, detections)) - // Keep queued count accurate + + // Enforce max queue size — drop OLDEST if over limit + enforceQueueLimit() } } } - private suspend fun processRetryQueue() { - if (pendingQueue.isEmpty() || !_adamapsReachable.value) return + /** Drop oldest detections if queue exceeds MAX_QUEUE_SIZE to prevent OOM */ + private fun enforceQueueLimit() { + var totalQueued = pendingQueue.sumOf { it.detections.size } + var dropped = 0 + + while (totalQueued > MAX_QUEUE_SIZE && pendingQueue.isNotEmpty()) { + val oldest = pendingQueue.poll() ?: break + dropped += oldest.detections.size + totalQueued -= oldest.detections.size + } + + if (dropped > 0) { + _stats.update { it.copy(queued = it.queued - dropped) } + _lastError.value = "Queue full: dropped $dropped oldest detections" + } + } - val toRetry = mutableListOf() - while (pendingQueue.isNotEmpty()) { - pendingQueue.poll()?.let { toRetry.add(it) } + private suspend fun processRetryQueue() { + if (pendingQueue.isEmpty()) return + + // Don't retry if ADAMaps is known to be down — wait for reachability check + if (!_adamapsReachable.value) { + // Increase backoff while waiting + retryConsecutiveFailures++ + retryBackoffMs = min( + (RETRY_MIN_BACKOFF_MS * Math.pow(BACKOFF_MULTIPLIER, retryConsecutiveFailures.toDouble())).toLong(), + RETRY_MAX_BACKOFF_MS + ) + return } - for (item in toRetry) { - // Server expects flat array of detections, each with device_id embedded - val request = item.detections.map { it.toAdaMapsDetection(item.deviceId) } + // Take one batch at a time (oldest first) to avoid blocking on large queues + val item = pendingQueue.poll() ?: return - when (val result = adamapsClient.ingest(request)) { - is ApiResult.Success -> { - _stats.update { - it.copy( - sent = it.sent + item.detections.size, - queued = it.queued - item.detections.size - ) - } - _adamapsReachable.value = true - } - is ApiResult.Error -> { - item.attempts++ - if (item.attempts < 5) { - // Re-queue for retry (max 5 attempts) - pendingQueue.add(item) - } else { - // Give up after 5 attempts, remove from queued count - _stats.update { it.copy(queued = it.queued - item.detections.size) } - _lastError.value = "Dropped ${item.detections.size} detections after 5 failed attempts" - } - _adamapsReachable.value = false + // Server expects flat array of detections, each with device_id embedded + val request = item.detections.map { it.toAdaMapsDetection(item.deviceId) } + + when (val result = adamapsClient.ingest(request)) { + is ApiResult.Success -> { + _stats.update { + it.copy( + sent = it.sent + item.detections.size, + queued = it.queued - item.detections.size + ) } + _adamapsReachable.value = true + // Reset backoff on success + retryConsecutiveFailures = 0 + retryBackoffMs = RETRY_MIN_BACKOFF_MS + _lastError.value = null + } + is ApiResult.Error -> { + // Always re-queue — never drop detections (except for max queue size) + pendingQueue.add(item) + _adamapsReachable.value = false + + // Increase backoff: 5s → 10s → 20s → 40s → 60s (capped) + retryConsecutiveFailures++ + retryBackoffMs = min( + (RETRY_MIN_BACKOFF_MS * Math.pow(BACKOFF_MULTIPLIER, retryConsecutiveFailures.toDouble())).toLong(), + RETRY_MAX_BACKOFF_MS + ) + + _lastError.value = "ADAMaps: ${result.message} (retry in ${retryBackoffMs / 1000}s)" } } }