cardano-checkout-py/cardano_checkout/monitor.py
Kayos a6d4ac8521 v0.2: refactor monitor + scheduler around InvoiceStore Protocol
Rewrite monitor.py so it operates entirely through the
cardano_checkout.store.InvoiceStore Protocol — no more SQLAlchemy
imports, no more CardanoPayment / PlatformConfig coupling. Same
behavioural shape: same Koios URL, same 15s check cadence, same 2%
confirm / overpay tolerances, same 3-reprice cap.

Rewrite scheduler.py as a reusable InvoiceScheduler dataclass that
wires two APScheduler jobs (check_pending every 15s, reprice_expired
every 60s) against a consumer-supplied store. The subscription +
grace-period jobs are TradeCraft-specific and get lifted into
tradecraft_compat.py verbatim so TradeCraft can still import them
during the migration window without any code change.

Add InMemoryStore reference implementation to store.py — used by the
test suite and handy for local dev / ephemeral workflows.

Bump version to 0.2.0-dev.
2026-04-23 19:55:28 -07:00

421 lines
15 KiB
Python

"""Cardano UTXO monitoring — framework-agnostic polling against :class:`InvoiceStore`.
Polls Koios for on-chain payments at the receive addresses of invoices that
are still in a non-terminal state, updates invoice status through the
:class:`InvoiceStore` Protocol, and optionally reprices expired invoices
against the current ADA/USD oracle snapshot.
Called by :mod:`cardano_checkout.scheduler` every 15 seconds for pending
payments, and every 60 seconds to reprice expired ones.
Koios endpoint used::
POST https://api.koios.rest/api/v1/address_utxos
Body: {"_addresses": ["addr1..."]}
Status transitions applied here::
PENDING ──► CONFIRMED (received >= expected * CONFIRM_TOLERANCE)
PENDING ──► UNDERPAID (received > 0 but below tolerance)
PENDING ──► OVERPAID (received >= expected * OVERPAY_THRESHOLD)
PENDING ──► EXPIRED (after reprice_count exhausts — see reprice_expired_invoices)
Behavioral shape is identical to the original TradeCraft ``services/cardano_monitor.py``:
same polling intervals, same Koios URL, same 2% confirm / overpay tolerances.
The only change is that persistence is now delegated to the store Protocol
instead of being welded to SQLAlchemy + the ``CardanoPayment`` model.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
import httpx
from cardano_checkout.invoice import Invoice, InvoiceStatus
from cardano_checkout.oracles import (
KNOWN_TOKENS,
convert_token_to_lovelace,
convert_usd_to_lovelace,
get_ada_usd_price,
)
from cardano_checkout.store import InvoiceStore
logger = logging.getLogger(__name__)
KOIOS_URL = "https://api.koios.rest/api/v1/address_utxos"
KOIOS_TIMEOUT = 15 # seconds
# Tolerance for confirming payment (2%) — unchanged from v0.1 / TradeCraft.
CONFIRM_TOLERANCE = 0.98
OVERPAY_THRESHOLD = 1.02
# Default reprice cap + window (matches TradeCraft defaults).
DEFAULT_MAX_REPRICINGS = 3
DEFAULT_PAYMENT_WINDOW_MINUTES = 15
# ---------------------------------------------------------------------------
# Koios UTXO query
# ---------------------------------------------------------------------------
async def check_address_utxos(
address: str, koios_url: str = KOIOS_URL, timeout: float = KOIOS_TIMEOUT
) -> list[dict]:
"""Query Koios for all UTXOs at ``address``.
Returns a list of UTXO dicts (each with ``tx_hash``, ``tx_index``,
``value``, ``asset_list``) or an empty list on any error. Never raises.
"""
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(
koios_url,
json={"_addresses": [address]},
headers={"Accept": "application/json"},
)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, list):
logger.warning(
"[cardano-monitor] Unexpected Koios response shape for %s",
address[:20],
)
return []
return data
except httpx.HTTPStatusError as e:
logger.error(
"[cardano-monitor] Koios HTTP %s for %s: %s",
e.response.status_code,
address[:20],
e.response.text[:200],
)
return []
except httpx.TimeoutException:
logger.warning("[cardano-monitor] Koios timeout for address %s", address[:20])
return []
except Exception as e: # pragma: no cover — defensive
logger.error(
"[cardano-monitor] Koios unexpected error for %s: %s", address[:20], e
)
return []
# Backwards-compatible alias — monitor.py in TradeCraft imports the private name.
# Keeping a leading-underscore alias so the TradeCraft shim can still reach it.
_check_address_utxos = check_address_utxos
# ---------------------------------------------------------------------------
# UTXO evaluation
# ---------------------------------------------------------------------------
async def evaluate_utxos(
expected_lovelace: int, utxos: list[dict]
) -> tuple[InvoiceStatus, int, int, dict, Optional[str]]:
"""Classify a batch of UTXOs against an expected-lovelace target.
Returns:
Tuple of ``(status, raw_lovelace, total_value_lovelace, received_assets, latest_tx_hash)``.
- ``status`` — :class:`InvoiceStatus` the invoice should transition into.
``PENDING`` means "no change, keep polling".
- ``raw_lovelace`` — pure ADA received (excluding native-asset value).
- ``total_value_lovelace`` — ADA + ADA-equivalent of native assets via DexHunter.
- ``received_assets`` — ``{policy_id.asset_name_hex: quantity}``.
- ``latest_tx_hash`` — most recent observed tx hash, or None if no UTXOs.
Status rules mirror TradeCraft exactly:
- No UTXOs → ``PENDING`` (no change)
- ``total_value >= expected * OVERPAY_THRESHOLD`` → ``OVERPAID`` (treated as confirmed)
- ``total_value >= expected * CONFIRM_TOLERANCE`` → ``CONFIRMED``
- ``total_value > 0`` but below tolerance → ``UNDERPAID``
"""
if not utxos:
return InvoiceStatus.PENDING, 0, 0, {}, None
raw_lovelace = 0
received_assets: dict[str, int] = {}
latest_tx_hash: Optional[str] = None
for utxo in utxos:
try:
raw_lovelace += int(utxo.get("value", 0))
except (ValueError, TypeError):
pass
tx = utxo.get("tx_hash")
if tx:
latest_tx_hash = tx
for asset in utxo.get("asset_list", []) or []:
policy_id = asset.get("policy_id", "")
asset_name = asset.get("asset_name", "")
asset_id = f"{policy_id}.{asset_name}"
try:
qty = int(asset.get("quantity", 0))
except (ValueError, TypeError):
qty = 0
if qty > 0:
received_assets[asset_id] = received_assets.get(asset_id, 0) + qty
# Convert native assets to lovelace equivalent via DexHunter.
asset_lovelace = 0
for asset_id, qty in received_assets.items():
if "." not in asset_id:
continue
policy_id, asset_name_hex = asset_id.split(".", 1)
decimals = 0
for token_info in KNOWN_TOKENS.values():
if token_info.get("policy_id") == policy_id:
decimals = token_info.get("decimals", 0)
break
try:
lv = await convert_token_to_lovelace(
policy_id, asset_name_hex, qty, decimals
)
if lv is not None:
asset_lovelace += lv
except Exception as e:
logger.warning(
"[cardano-monitor] Failed to convert asset %s to lovelace: %s",
asset_id[:20],
e,
)
total_value = raw_lovelace + asset_lovelace
if expected_lovelace == 0:
# Degenerate case — any payment at all counts.
new_status = InvoiceStatus.CONFIRMED if total_value > 0 else InvoiceStatus.PENDING
elif total_value >= expected_lovelace * OVERPAY_THRESHOLD:
new_status = InvoiceStatus.OVERPAID
elif total_value >= expected_lovelace * CONFIRM_TOLERANCE:
new_status = InvoiceStatus.CONFIRMED
elif total_value > 0:
new_status = InvoiceStatus.UNDERPAID
else:
new_status = InvoiceStatus.PENDING
return new_status, raw_lovelace, total_value, received_assets, latest_tx_hash
# Backwards-compatible alias.
_evaluate_utxos = evaluate_utxos
# ---------------------------------------------------------------------------
# Main monitoring entrypoints (scheduled jobs call these)
# ---------------------------------------------------------------------------
async def check_pending_invoices(
store: InvoiceStore,
koios_url: str = KOIOS_URL,
limit: int = 100,
) -> int:
"""Check every :class:`InvoiceStatus.PENDING` invoice for on-chain payment.
For each pending invoice whose expiry has not yet passed, query Koios
for the UTXOs at its receive address, evaluate them against
``expected_lovelace``, and transition the invoice state accordingly.
Args:
store: Persistence backend.
koios_url: Koios base URL. Override for testnet / custom gateways.
limit: Max pending invoices to process per poll.
Returns:
Number of invoices updated this cycle (for logging / metrics).
"""
now = datetime.now(timezone.utc)
pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit)
if not pending:
return 0
# Filter out already-expired invoices — those get picked up by reprice_expired_invoices.
active = [
inv for inv in pending if inv.expires_at is None or inv.expires_at > now
]
if not active:
return 0
logger.debug("[cardano-monitor] Checking %d pending invoice(s)", len(active))
updates = 0
for invoice in active:
try:
utxos = await check_address_utxos(invoice.receive_address, koios_url=koios_url)
(
new_status,
raw_lovelace,
total_value,
_received_assets,
tx_hash,
) = await evaluate_utxos(invoice.expected_lovelace, utxos)
if new_status == invoice.status and raw_lovelace == 0:
# No change and no UTXOs — nothing to persist.
continue
invoice.received_lovelace = raw_lovelace
if tx_hash and tx_hash not in invoice.tx_hashes:
invoice.tx_hashes.append(tx_hash)
# record_tx is idempotent per contract — call it so the store
# can persist the per-utxo history however it wants.
try:
await store.record_tx(
invoice.id, tx_hash, lovelace_delta=raw_lovelace
)
except Exception as e:
logger.warning(
"[cardano-monitor] record_tx failed for %s/%s: %s",
invoice.id,
tx_hash[:12],
e,
)
if new_status != invoice.status:
old_status = invoice.status
invoice.status = new_status
if new_status in (InvoiceStatus.CONFIRMED, InvoiceStatus.OVERPAID):
invoice.confirmed_at = now
logger.info(
"[cardano-monitor] invoice=%s merchant=%s: %s -> %s "
"(%.6f ADA received, %.6f ADA total value)",
invoice.id,
invoice.merchant_id,
old_status.value,
new_status.value,
raw_lovelace / 1_000_000,
total_value / 1_000_000,
)
await store.update(invoice)
updates += 1
except Exception as e:
logger.exception(
"[cardano-monitor] Error checking invoice %s: %s", invoice.id, e
)
return updates
async def reprice_expired_invoices(
store: InvoiceStore,
window_minutes: int = DEFAULT_PAYMENT_WINDOW_MINUTES,
max_repricings: int = DEFAULT_MAX_REPRICINGS,
limit: int = 100,
) -> int:
"""Reprice PENDING invoices whose expiry has passed.
Pulls the current ADA/USD oracle price, recalculates ``expected_lovelace``
from the invoice's ``usd_amount``, resets ``expires_at`` to
``now + window_minutes``, and tracks reprice count in ``invoice.metadata``
under the key ``repriced_count``. After ``max_repricings`` the invoice
is transitioned to :class:`InvoiceStatus.EXPIRED`.
Args:
store: Persistence backend.
window_minutes: New expiry window per reprice. Matches TradeCraft's
platform-config-driven value of 15 minutes by default.
max_repricings: Give-up threshold. TradeCraft default is 3.
limit: Max pending invoices to process per call.
Returns:
Number of invoices updated (repriced or expired) this cycle.
"""
now = datetime.now(timezone.utc)
pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit)
if not pending:
return 0
expired_candidates = [
inv for inv in pending if inv.expires_at is not None and inv.expires_at <= now
]
if not expired_candidates:
return 0
logger.info(
"[cardano-monitor] Repricing %d expired invoice(s)", len(expired_candidates)
)
ada_price = await get_ada_usd_price()
if ada_price <= 0:
logger.warning(
"[cardano-monitor] Cannot reprice — ADA price unavailable"
)
return 0
new_expires_at = now + timedelta(minutes=window_minutes)
updated = 0
for invoice in expired_candidates:
try:
repriced_count = int(invoice.metadata.get("repriced_count", 0))
if repriced_count >= max_repricings:
invoice.status = InvoiceStatus.EXPIRED
await store.update(invoice)
updated += 1
logger.info(
"[cardano-monitor] invoice %s expired after %d repricings",
invoice.id,
repriced_count,
)
continue
usd_amount = float(invoice.usd_amount or 0)
if usd_amount <= 0:
invoice.status = InvoiceStatus.EXPIRED
await store.update(invoice)
updated += 1
logger.warning(
"[cardano-monitor] invoice %s has no usd_amount — marking expired",
invoice.id,
)
continue
new_lovelace = await convert_usd_to_lovelace(usd_amount)
if new_lovelace == 0:
logger.warning(
"[cardano-monitor] invoice %s: lovelace conversion returned 0, skipping",
invoice.id,
)
continue
old_lovelace = invoice.expected_lovelace
invoice.expected_lovelace = new_lovelace
invoice.expires_at = new_expires_at
invoice.metadata["repriced_count"] = repriced_count + 1
invoice.metadata["ada_price_usd"] = round(ada_price, 4)
await store.update(invoice)
updated += 1
logger.info(
"[cardano-monitor] Repriced invoice %s: %d -> %d lovelace "
"(ADA=$%.4f, reprice #%d)",
invoice.id,
old_lovelace or 0,
new_lovelace,
ada_price,
repriced_count + 1,
)
except Exception as e:
logger.exception(
"[cardano-monitor] Error repricing invoice %s: %s", invoice.id, e
)
return updated