cardano-checkout-py/cardano_checkout/monitor.py
Cobb Hayes c592a58148 Public-flip audit: drop audit-ticket prefixes + topology refs + AI scaffolding
cardano-api: strip 'Fix #N:' audit-ticket prefixes from inline comments (was
50+ in main.py), drop hardening-pass changelog blocks from module docstring,
rewrite README to drop deploy paths + marketing sections, keep tier/auth/TTL
+ policy IDs.

cardano-checkout-py: drop TradeCraft lineage refs, swap chromaticcraft/tradecraft
test fixtures for acme/globex, repository URL → git.sulkta.com.
2026-05-27 11:15:03 -07:00

403 lines
14 KiB
Python

"""Cardano UTXO monitoring — framework-agnostic polling against :class:`InvoiceStore`.
Polls Koios for on-chain payments at the receive addresses of invoices that
are still in a non-terminal state, updates invoice status through the
:class:`InvoiceStore` Protocol, and optionally reprices expired invoices
against the current ADA/USD oracle snapshot.
Called by :mod:`cardano_checkout.scheduler` every 15 seconds for pending
payments, and every 60 seconds to reprice expired ones.
Koios endpoint used::
POST https://api.koios.rest/api/v1/address_utxos
Body: {"_addresses": ["addr1..."]}
Status transitions::
PENDING ──► CONFIRMED (received >= expected * CONFIRM_TOLERANCE)
PENDING ──► UNDERPAID (received > 0 but below tolerance)
PENDING ──► OVERPAID (received >= expected * OVERPAY_THRESHOLD)
PENDING ──► EXPIRED (after reprice_count exhausts — see reprice_expired_invoices)
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import Awaitable, Callable, Optional
import httpx
from cardano_checkout.invoice import Invoice, InvoiceStatus
from cardano_checkout.store import InvoiceStore
# Consumer-supplied pricing callable: takes a USD amount (float),
# returns the current-market lovelace equivalent as int. Invoked by
# :func:`reprice_expired_invoices` to generate fresh quotes when an
# invoice's quote window lapses without payment.
PriceFn = Callable[[float], Awaitable[int]]
logger = logging.getLogger(__name__)
KOIOS_URL = "https://api.koios.rest/api/v1/address_utxos"
KOIOS_TIMEOUT = 15 # seconds
# Tolerance for confirming payment (2%).
CONFIRM_TOLERANCE = 0.98
OVERPAY_THRESHOLD = 1.02
# Default reprice cap + window.
DEFAULT_MAX_REPRICINGS = 3
DEFAULT_PAYMENT_WINDOW_MINUTES = 15
# ---------------------------------------------------------------------------
# Koios UTXO query
# ---------------------------------------------------------------------------
async def check_address_utxos(
address: str, koios_url: str = KOIOS_URL, timeout: float = KOIOS_TIMEOUT
) -> list[dict]:
"""Query Koios for all UTXOs at ``address``.
Returns a list of UTXO dicts (each with ``tx_hash``, ``tx_index``,
``value``, ``asset_list``) or an empty list on any error. Never raises.
"""
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(
koios_url,
json={"_addresses": [address]},
headers={"Accept": "application/json"},
)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, list):
logger.warning(
"[cardano-monitor] Unexpected Koios response shape for %s",
address[:20],
)
return []
return data
except httpx.HTTPStatusError as e:
logger.error(
"[cardano-monitor] Koios HTTP %s for %s: %s",
e.response.status_code,
address[:20],
e.response.text[:200],
)
return []
except httpx.TimeoutException:
logger.warning("[cardano-monitor] Koios timeout for address %s", address[:20])
return []
except Exception as e: # pragma: no cover — defensive
logger.error(
"[cardano-monitor] Koios unexpected error for %s: %s", address[:20], e
)
return []
# Leading-underscore alias kept for callers that imported the private name.
_check_address_utxos = check_address_utxos
# ---------------------------------------------------------------------------
# UTXO evaluation
# ---------------------------------------------------------------------------
async def evaluate_utxos(
expected_lovelace: int, utxos: list[dict]
) -> tuple[InvoiceStatus, int, int, dict, Optional[str]]:
"""Classify a batch of UTXOs against an expected-lovelace target.
Returns:
Tuple of ``(status, raw_lovelace, total_value_lovelace, received_assets, latest_tx_hash)``.
- ``status`` — :class:`InvoiceStatus` the invoice should transition into.
``PENDING`` means "no change, keep polling".
- ``raw_lovelace`` — pure ADA received (excluding native-asset value).
- ``total_value_lovelace`` — ADA + ADA-equivalent of native assets via DexHunter.
- ``received_assets`` — ``{policy_id.asset_name_hex: quantity}``.
- ``latest_tx_hash`` — most recent observed tx hash, or None if no UTXOs.
Status rules:
- No UTXOs → ``PENDING`` (no change)
- ``total_value >= expected * OVERPAY_THRESHOLD`` → ``OVERPAID`` (treated as confirmed)
- ``total_value >= expected * CONFIRM_TOLERANCE`` → ``CONFIRMED``
- ``total_value > 0`` but below tolerance → ``UNDERPAID``
"""
if not utxos:
return InvoiceStatus.PENDING, 0, 0, {}, None
raw_lovelace = 0
received_assets: dict[str, int] = {}
latest_tx_hash: Optional[str] = None
for utxo in utxos:
try:
raw_lovelace += int(utxo.get("value", 0))
except (ValueError, TypeError):
pass
tx = utxo.get("tx_hash")
if tx:
latest_tx_hash = tx
for asset in utxo.get("asset_list", []) or []:
policy_id = asset.get("policy_id", "")
asset_name = asset.get("asset_name", "")
asset_id = f"{policy_id}.{asset_name}"
try:
qty = int(asset.get("quantity", 0))
except (ValueError, TypeError):
qty = 0
if qty > 0:
received_assets[asset_id] = received_assets.get(asset_id, 0) + qty
# ADA-only matching. Native tokens in the same UTxOs are recorded in
# received_assets for visibility but do NOT contribute to the
# payment-matched total. Wrap this function with your own
# asset-to-lovelace converter to accept native tokens.
total_value = raw_lovelace
if expected_lovelace == 0:
# Degenerate case — any payment at all counts.
new_status = InvoiceStatus.CONFIRMED if total_value > 0 else InvoiceStatus.PENDING
elif total_value >= expected_lovelace * OVERPAY_THRESHOLD:
new_status = InvoiceStatus.OVERPAID
elif total_value >= expected_lovelace * CONFIRM_TOLERANCE:
new_status = InvoiceStatus.CONFIRMED
elif total_value > 0:
new_status = InvoiceStatus.UNDERPAID
else:
new_status = InvoiceStatus.PENDING
return new_status, raw_lovelace, total_value, received_assets, latest_tx_hash
# Leading-underscore alias for callers that imported the private name.
_evaluate_utxos = evaluate_utxos
# ---------------------------------------------------------------------------
# Main monitoring entrypoints (scheduled jobs call these)
# ---------------------------------------------------------------------------
async def check_pending_invoices(
store: InvoiceStore,
koios_url: str = KOIOS_URL,
limit: int = 100,
) -> int:
"""Check every :class:`InvoiceStatus.PENDING` invoice for on-chain payment.
For each pending invoice whose expiry has not yet passed, query Koios
for the UTXOs at its receive address, evaluate them against
``expected_lovelace``, and transition the invoice state accordingly.
Args:
store: Persistence backend.
koios_url: Koios base URL. Override for testnet / custom gateways.
limit: Max pending invoices to process per poll.
Returns:
Number of invoices updated this cycle (for logging / metrics).
"""
now = datetime.now(timezone.utc)
pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit)
if not pending:
return 0
# Filter out already-expired invoices — those get picked up by reprice_expired_invoices.
active = [
inv for inv in pending if inv.expires_at is None or inv.expires_at > now
]
if not active:
return 0
logger.debug("[cardano-monitor] Checking %d pending invoice(s)", len(active))
updates = 0
for invoice in active:
try:
utxos = await check_address_utxos(invoice.receive_address, koios_url=koios_url)
(
new_status,
raw_lovelace,
total_value,
_received_assets,
tx_hash,
) = await evaluate_utxos(invoice.expected_lovelace, utxos)
if new_status == invoice.status and raw_lovelace == 0:
# No change and no UTXOs — nothing to persist.
continue
invoice.received_lovelace = raw_lovelace
if tx_hash and tx_hash not in invoice.tx_hashes:
invoice.tx_hashes.append(tx_hash)
# record_tx is idempotent per contract — call it so the store
# can persist the per-utxo history however it wants.
try:
await store.record_tx(
invoice.id, tx_hash, lovelace_delta=raw_lovelace
)
except Exception as e:
logger.warning(
"[cardano-monitor] record_tx failed for %s/%s: %s",
invoice.id,
tx_hash[:12],
e,
)
if new_status != invoice.status:
old_status = invoice.status
invoice.status = new_status
if new_status in (InvoiceStatus.CONFIRMED, InvoiceStatus.OVERPAID):
invoice.confirmed_at = now
logger.info(
"[cardano-monitor] invoice=%s merchant=%s: %s -> %s "
"(%.6f ADA received, %.6f ADA total value)",
invoice.id,
invoice.merchant_id,
old_status.value,
new_status.value,
raw_lovelace / 1_000_000,
total_value / 1_000_000,
)
await store.update(invoice)
updates += 1
except Exception as e:
logger.exception(
"[cardano-monitor] Error checking invoice %s: %s", invoice.id, e
)
return updates
async def reprice_expired_invoices(
store: InvoiceStore,
*,
price_fn: PriceFn,
window_minutes: int = DEFAULT_PAYMENT_WINDOW_MINUTES,
max_repricings: int = DEFAULT_MAX_REPRICINGS,
limit: int = 100,
) -> int:
"""Reprice PENDING invoices whose expiry has passed.
Calls the consumer-supplied ``price_fn(usd_amount) -> lovelace`` to
recompute ``expected_lovelace`` at current market. Resets ``expires_at``
to ``now + window_minutes`` and increments ``invoice.metadata["repriced_count"]``.
After ``max_repricings`` the invoice transitions to
:class:`InvoiceStatus.EXPIRED`.
Args:
store: Persistence backend.
price_fn: Async callable that takes a USD amount and returns the
current lovelace equivalent. Example::
from cardano_checkout.monitor import reprice_expired_invoices
async def my_price_fn(usd: float) -> int:
rate = await coingecko_fetch_ada_usd()
return int(round(usd / rate * 1_000_000))
await reprice_expired_invoices(store, price_fn=my_price_fn)
window_minutes: New expiry window per reprice. Default 15.
max_repricings: Give-up threshold. Default 3.
limit: Max pending invoices to process per call.
Returns:
Number of invoices updated (repriced or expired) this cycle.
"""
now = datetime.now(timezone.utc)
pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit)
if not pending:
return 0
expired_candidates = [
inv for inv in pending if inv.expires_at is not None and inv.expires_at <= now
]
if not expired_candidates:
return 0
logger.info(
"[cardano-monitor] Repricing %d expired invoice(s)", len(expired_candidates)
)
new_expires_at = now + timedelta(minutes=window_minutes)
updated = 0
for invoice in expired_candidates:
try:
repriced_count = int(invoice.metadata.get("repriced_count", 0))
if repriced_count >= max_repricings:
invoice.status = InvoiceStatus.EXPIRED
await store.update(invoice)
updated += 1
logger.info(
"[cardano-monitor] invoice %s expired after %d repricings",
invoice.id,
repriced_count,
)
continue
usd_amount = float(invoice.usd_amount or 0)
if usd_amount <= 0:
invoice.status = InvoiceStatus.EXPIRED
await store.update(invoice)
updated += 1
logger.warning(
"[cardano-monitor] invoice %s has no usd_amount — marking expired",
invoice.id,
)
continue
try:
new_lovelace = await price_fn(usd_amount)
except Exception as e:
logger.warning(
"[cardano-monitor] invoice %s: price_fn raised %s, skipping",
invoice.id,
e,
)
continue
if new_lovelace <= 0:
logger.warning(
"[cardano-monitor] invoice %s: price_fn returned %d, skipping",
invoice.id,
new_lovelace,
)
continue
old_lovelace = invoice.expected_lovelace
invoice.expected_lovelace = new_lovelace
invoice.expires_at = new_expires_at
invoice.metadata["repriced_count"] = repriced_count + 1
await store.update(invoice)
updated += 1
logger.info(
"[cardano-monitor] Repriced invoice %s: %d -> %d lovelace (reprice #%d)",
invoice.id,
old_lovelace or 0,
new_lovelace,
repriced_count + 1,
)
except Exception as e:
logger.exception(
"[cardano-monitor] Error repricing invoice %s: %s", invoice.id, e
)
return updated