v6: retry indefinitely with backoff, never drop detections
- Remove 5-attempt drop limit — retry forever until success - Exponential backoff for retries: 5s → 10s → 20s → 40s → 60s (capped) - Only drop detections if queue exceeds 2000 (memory protection) - Drop oldest first when over limit (FIFO eviction) - Show retry countdown in error message - Reset backoff on successful send Handles intermittent 502s from ADAMaps gracefully by holding queue until server recovers.
This commit is contained in:
parent
f06ce3f319
commit
2cebb4fb5d
1 changed files with 77 additions and 35 deletions
|
|
@ -41,11 +41,16 @@ class ForwardingService : LifecycleService() {
|
|||
const val ACTION_START = "com.adamaps.varroa.START_FORWARDING"
|
||||
const val ACTION_STOP = "com.adamaps.varroa.STOP_FORWARDING"
|
||||
|
||||
// Exponential backoff constants
|
||||
// Exponential backoff constants (Bee polling)
|
||||
private const val MIN_BACKOFF_MS = 5_000L // 5 seconds
|
||||
private const val MAX_BACKOFF_MS = 60_000L // 60 seconds
|
||||
private const val BACKOFF_MULTIPLIER = 2.0
|
||||
|
||||
// ADAMaps retry queue constants
|
||||
private const val RETRY_MIN_BACKOFF_MS = 5_000L // 5 seconds
|
||||
private const val RETRY_MAX_BACKOFF_MS = 60_000L // 60 seconds (cap)
|
||||
private const val MAX_QUEUE_SIZE = 2000 // Max detections to hold in memory
|
||||
|
||||
// Shared state exposed to ViewModels
|
||||
private val _stats = MutableStateFlow(SessionStats())
|
||||
val stats: StateFlow<SessionStats> = _stats.asStateFlow()
|
||||
|
|
@ -75,17 +80,18 @@ class ForwardingService : LifecycleService() {
|
|||
private var reachJob: Job? = null
|
||||
private var retryJob: Job? = null
|
||||
|
||||
// Exponential backoff state
|
||||
// Exponential backoff state (Bee polling)
|
||||
private var currentBackoffMs = MIN_BACKOFF_MS
|
||||
private var consecutiveFailures = 0
|
||||
|
||||
// Retry queue for failed ADAMaps sends
|
||||
private val pendingQueue = ConcurrentLinkedQueue<QueuedIngest>()
|
||||
private var retryBackoffMs = RETRY_MIN_BACKOFF_MS
|
||||
private var retryConsecutiveFailures = 0
|
||||
|
||||
private data class QueuedIngest(
|
||||
val deviceId: String,
|
||||
val detections: List<BeeDetection>,
|
||||
var attempts: Int = 0
|
||||
val detections: List<BeeDetection>
|
||||
)
|
||||
|
||||
override fun onCreate() {
|
||||
|
|
@ -173,7 +179,8 @@ class ForwardingService : LifecycleService() {
|
|||
retryJob?.cancel()
|
||||
retryJob = lifecycleScope.launch {
|
||||
while (true) {
|
||||
delay(10_000L) // Check retry queue every 10 seconds
|
||||
// Use exponential backoff: 5s → 10s → 20s → 40s → 60s (capped)
|
||||
delay(retryBackoffMs)
|
||||
processRetryQueue()
|
||||
}
|
||||
}
|
||||
|
|
@ -256,51 +263,86 @@ class ForwardingService : LifecycleService() {
|
|||
is ApiResult.Success -> {
|
||||
_stats.update { it.copy(sent = it.sent + detections.size, queued = it.queued - detections.size) }
|
||||
_adamapsReachable.value = true
|
||||
// Reset retry backoff on direct success
|
||||
retryConsecutiveFailures = 0
|
||||
retryBackoffMs = RETRY_MIN_BACKOFF_MS
|
||||
}
|
||||
is ApiResult.Error -> {
|
||||
_lastError.value = "ADAMaps: ${result.message}"
|
||||
_adamapsReachable.value = false
|
||||
// Add to retry queue instead of losing detections
|
||||
pendingQueue.add(QueuedIngest(deviceId, detections))
|
||||
// Keep queued count accurate
|
||||
|
||||
// Enforce max queue size — drop OLDEST if over limit
|
||||
enforceQueueLimit()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun processRetryQueue() {
|
||||
if (pendingQueue.isEmpty() || !_adamapsReachable.value) return
|
||||
/** Drop oldest detections if queue exceeds MAX_QUEUE_SIZE to prevent OOM */
|
||||
private fun enforceQueueLimit() {
|
||||
var totalQueued = pendingQueue.sumOf { it.detections.size }
|
||||
var dropped = 0
|
||||
|
||||
while (totalQueued > MAX_QUEUE_SIZE && pendingQueue.isNotEmpty()) {
|
||||
val oldest = pendingQueue.poll() ?: break
|
||||
dropped += oldest.detections.size
|
||||
totalQueued -= oldest.detections.size
|
||||
}
|
||||
|
||||
if (dropped > 0) {
|
||||
_stats.update { it.copy(queued = it.queued - dropped) }
|
||||
_lastError.value = "Queue full: dropped $dropped oldest detections"
|
||||
}
|
||||
}
|
||||
|
||||
val toRetry = mutableListOf<QueuedIngest>()
|
||||
while (pendingQueue.isNotEmpty()) {
|
||||
pendingQueue.poll()?.let { toRetry.add(it) }
|
||||
private suspend fun processRetryQueue() {
|
||||
if (pendingQueue.isEmpty()) return
|
||||
|
||||
// Don't retry if ADAMaps is known to be down — wait for reachability check
|
||||
if (!_adamapsReachable.value) {
|
||||
// Increase backoff while waiting
|
||||
retryConsecutiveFailures++
|
||||
retryBackoffMs = min(
|
||||
(RETRY_MIN_BACKOFF_MS * Math.pow(BACKOFF_MULTIPLIER, retryConsecutiveFailures.toDouble())).toLong(),
|
||||
RETRY_MAX_BACKOFF_MS
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
for (item in toRetry) {
|
||||
// Server expects flat array of detections, each with device_id embedded
|
||||
val request = item.detections.map { it.toAdaMapsDetection(item.deviceId) }
|
||||
// Take one batch at a time (oldest first) to avoid blocking on large queues
|
||||
val item = pendingQueue.poll() ?: return
|
||||
|
||||
when (val result = adamapsClient.ingest(request)) {
|
||||
is ApiResult.Success -> {
|
||||
_stats.update {
|
||||
it.copy(
|
||||
sent = it.sent + item.detections.size,
|
||||
queued = it.queued - item.detections.size
|
||||
)
|
||||
}
|
||||
_adamapsReachable.value = true
|
||||
}
|
||||
is ApiResult.Error -> {
|
||||
item.attempts++
|
||||
if (item.attempts < 5) {
|
||||
// Re-queue for retry (max 5 attempts)
|
||||
pendingQueue.add(item)
|
||||
} else {
|
||||
// Give up after 5 attempts, remove from queued count
|
||||
_stats.update { it.copy(queued = it.queued - item.detections.size) }
|
||||
_lastError.value = "Dropped ${item.detections.size} detections after 5 failed attempts"
|
||||
}
|
||||
_adamapsReachable.value = false
|
||||
// Server expects flat array of detections, each with device_id embedded
|
||||
val request = item.detections.map { it.toAdaMapsDetection(item.deviceId) }
|
||||
|
||||
when (val result = adamapsClient.ingest(request)) {
|
||||
is ApiResult.Success -> {
|
||||
_stats.update {
|
||||
it.copy(
|
||||
sent = it.sent + item.detections.size,
|
||||
queued = it.queued - item.detections.size
|
||||
)
|
||||
}
|
||||
_adamapsReachable.value = true
|
||||
// Reset backoff on success
|
||||
retryConsecutiveFailures = 0
|
||||
retryBackoffMs = RETRY_MIN_BACKOFF_MS
|
||||
_lastError.value = null
|
||||
}
|
||||
is ApiResult.Error -> {
|
||||
// Always re-queue — never drop detections (except for max queue size)
|
||||
pendingQueue.add(item)
|
||||
_adamapsReachable.value = false
|
||||
|
||||
// Increase backoff: 5s → 10s → 20s → 40s → 60s (capped)
|
||||
retryConsecutiveFailures++
|
||||
retryBackoffMs = min(
|
||||
(RETRY_MIN_BACKOFF_MS * Math.pow(BACKOFF_MULTIPLIER, retryConsecutiveFailures.toDouble())).toLong(),
|
||||
RETRY_MAX_BACKOFF_MS
|
||||
)
|
||||
|
||||
_lastError.value = "ADAMaps: ${result.message} (retry in ${retryBackoffMs / 1000}s)"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue