cardano-api: strip 'Fix #N:' audit-ticket prefixes from inline comments (was 50+ in main.py), drop hardening-pass changelog blocks from module docstring, rewrite README to drop deploy paths + marketing sections, keep tier/auth/TTL + policy IDs. cardano-checkout-py: drop TradeCraft lineage refs, swap chromaticcraft/tradecraft test fixtures for acme/globex, repository URL → git.sulkta.com.
403 lines
14 KiB
Python
403 lines
14 KiB
Python
"""Cardano UTXO monitoring — framework-agnostic polling against :class:`InvoiceStore`.
|
|
|
|
Polls Koios for on-chain payments at the receive addresses of invoices that
|
|
are still in a non-terminal state, updates invoice status through the
|
|
:class:`InvoiceStore` Protocol, and optionally reprices expired invoices
|
|
against the current ADA/USD oracle snapshot.
|
|
|
|
Called by :mod:`cardano_checkout.scheduler` every 15 seconds for pending
|
|
payments, and every 60 seconds to reprice expired ones.
|
|
|
|
Koios endpoint used::
|
|
|
|
POST https://api.koios.rest/api/v1/address_utxos
|
|
Body: {"_addresses": ["addr1..."]}
|
|
|
|
Status transitions::
|
|
|
|
PENDING ──► CONFIRMED (received >= expected * CONFIRM_TOLERANCE)
|
|
PENDING ──► UNDERPAID (received > 0 but below tolerance)
|
|
PENDING ──► OVERPAID (received >= expected * OVERPAY_THRESHOLD)
|
|
PENDING ──► EXPIRED (after reprice_count exhausts — see reprice_expired_invoices)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Awaitable, Callable, Optional
|
|
|
|
import httpx
|
|
|
|
from cardano_checkout.invoice import Invoice, InvoiceStatus
|
|
from cardano_checkout.store import InvoiceStore
|
|
|
|
# Consumer-supplied pricing callable: takes a USD amount (float),
|
|
# returns the current-market lovelace equivalent as int. Invoked by
|
|
# :func:`reprice_expired_invoices` to generate fresh quotes when an
|
|
# invoice's quote window lapses without payment.
|
|
PriceFn = Callable[[float], Awaitable[int]]
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
KOIOS_URL = "https://api.koios.rest/api/v1/address_utxos"
|
|
KOIOS_TIMEOUT = 15 # seconds
|
|
|
|
# Tolerance for confirming payment (2%).
|
|
CONFIRM_TOLERANCE = 0.98
|
|
OVERPAY_THRESHOLD = 1.02
|
|
|
|
# Default reprice cap + window.
|
|
DEFAULT_MAX_REPRICINGS = 3
|
|
DEFAULT_PAYMENT_WINDOW_MINUTES = 15
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Koios UTXO query
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def check_address_utxos(
|
|
address: str, koios_url: str = KOIOS_URL, timeout: float = KOIOS_TIMEOUT
|
|
) -> list[dict]:
|
|
"""Query Koios for all UTXOs at ``address``.
|
|
|
|
Returns a list of UTXO dicts (each with ``tx_hash``, ``tx_index``,
|
|
``value``, ``asset_list``) or an empty list on any error. Never raises.
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
resp = await client.post(
|
|
koios_url,
|
|
json={"_addresses": [address]},
|
|
headers={"Accept": "application/json"},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not isinstance(data, list):
|
|
logger.warning(
|
|
"[cardano-monitor] Unexpected Koios response shape for %s",
|
|
address[:20],
|
|
)
|
|
return []
|
|
return data
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(
|
|
"[cardano-monitor] Koios HTTP %s for %s: %s",
|
|
e.response.status_code,
|
|
address[:20],
|
|
e.response.text[:200],
|
|
)
|
|
return []
|
|
except httpx.TimeoutException:
|
|
logger.warning("[cardano-monitor] Koios timeout for address %s", address[:20])
|
|
return []
|
|
except Exception as e: # pragma: no cover — defensive
|
|
logger.error(
|
|
"[cardano-monitor] Koios unexpected error for %s: %s", address[:20], e
|
|
)
|
|
return []
|
|
|
|
|
|
# Leading-underscore alias kept for callers that imported the private name.
|
|
_check_address_utxos = check_address_utxos
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UTXO evaluation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def evaluate_utxos(
|
|
expected_lovelace: int, utxos: list[dict]
|
|
) -> tuple[InvoiceStatus, int, int, dict, Optional[str]]:
|
|
"""Classify a batch of UTXOs against an expected-lovelace target.
|
|
|
|
Returns:
|
|
Tuple of ``(status, raw_lovelace, total_value_lovelace, received_assets, latest_tx_hash)``.
|
|
|
|
- ``status`` — :class:`InvoiceStatus` the invoice should transition into.
|
|
``PENDING`` means "no change, keep polling".
|
|
- ``raw_lovelace`` — pure ADA received (excluding native-asset value).
|
|
- ``total_value_lovelace`` — ADA + ADA-equivalent of native assets via DexHunter.
|
|
- ``received_assets`` — ``{policy_id.asset_name_hex: quantity}``.
|
|
- ``latest_tx_hash`` — most recent observed tx hash, or None if no UTXOs.
|
|
|
|
Status rules:
|
|
|
|
- No UTXOs → ``PENDING`` (no change)
|
|
- ``total_value >= expected * OVERPAY_THRESHOLD`` → ``OVERPAID`` (treated as confirmed)
|
|
- ``total_value >= expected * CONFIRM_TOLERANCE`` → ``CONFIRMED``
|
|
- ``total_value > 0`` but below tolerance → ``UNDERPAID``
|
|
"""
|
|
if not utxos:
|
|
return InvoiceStatus.PENDING, 0, 0, {}, None
|
|
|
|
raw_lovelace = 0
|
|
received_assets: dict[str, int] = {}
|
|
latest_tx_hash: Optional[str] = None
|
|
|
|
for utxo in utxos:
|
|
try:
|
|
raw_lovelace += int(utxo.get("value", 0))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
tx = utxo.get("tx_hash")
|
|
if tx:
|
|
latest_tx_hash = tx
|
|
|
|
for asset in utxo.get("asset_list", []) or []:
|
|
policy_id = asset.get("policy_id", "")
|
|
asset_name = asset.get("asset_name", "")
|
|
asset_id = f"{policy_id}.{asset_name}"
|
|
try:
|
|
qty = int(asset.get("quantity", 0))
|
|
except (ValueError, TypeError):
|
|
qty = 0
|
|
if qty > 0:
|
|
received_assets[asset_id] = received_assets.get(asset_id, 0) + qty
|
|
|
|
# ADA-only matching. Native tokens in the same UTxOs are recorded in
|
|
# received_assets for visibility but do NOT contribute to the
|
|
# payment-matched total. Wrap this function with your own
|
|
# asset-to-lovelace converter to accept native tokens.
|
|
total_value = raw_lovelace
|
|
|
|
if expected_lovelace == 0:
|
|
# Degenerate case — any payment at all counts.
|
|
new_status = InvoiceStatus.CONFIRMED if total_value > 0 else InvoiceStatus.PENDING
|
|
elif total_value >= expected_lovelace * OVERPAY_THRESHOLD:
|
|
new_status = InvoiceStatus.OVERPAID
|
|
elif total_value >= expected_lovelace * CONFIRM_TOLERANCE:
|
|
new_status = InvoiceStatus.CONFIRMED
|
|
elif total_value > 0:
|
|
new_status = InvoiceStatus.UNDERPAID
|
|
else:
|
|
new_status = InvoiceStatus.PENDING
|
|
|
|
return new_status, raw_lovelace, total_value, received_assets, latest_tx_hash
|
|
|
|
|
|
# Leading-underscore alias for callers that imported the private name.
|
|
_evaluate_utxos = evaluate_utxos
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main monitoring entrypoints (scheduled jobs call these)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def check_pending_invoices(
|
|
store: InvoiceStore,
|
|
koios_url: str = KOIOS_URL,
|
|
limit: int = 100,
|
|
) -> int:
|
|
"""Check every :class:`InvoiceStatus.PENDING` invoice for on-chain payment.
|
|
|
|
For each pending invoice whose expiry has not yet passed, query Koios
|
|
for the UTXOs at its receive address, evaluate them against
|
|
``expected_lovelace``, and transition the invoice state accordingly.
|
|
|
|
Args:
|
|
store: Persistence backend.
|
|
koios_url: Koios base URL. Override for testnet / custom gateways.
|
|
limit: Max pending invoices to process per poll.
|
|
|
|
Returns:
|
|
Number of invoices updated this cycle (for logging / metrics).
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit)
|
|
if not pending:
|
|
return 0
|
|
|
|
# Filter out already-expired invoices — those get picked up by reprice_expired_invoices.
|
|
active = [
|
|
inv for inv in pending if inv.expires_at is None or inv.expires_at > now
|
|
]
|
|
if not active:
|
|
return 0
|
|
|
|
logger.debug("[cardano-monitor] Checking %d pending invoice(s)", len(active))
|
|
|
|
updates = 0
|
|
for invoice in active:
|
|
try:
|
|
utxos = await check_address_utxos(invoice.receive_address, koios_url=koios_url)
|
|
(
|
|
new_status,
|
|
raw_lovelace,
|
|
total_value,
|
|
_received_assets,
|
|
tx_hash,
|
|
) = await evaluate_utxos(invoice.expected_lovelace, utxos)
|
|
|
|
if new_status == invoice.status and raw_lovelace == 0:
|
|
# No change and no UTXOs — nothing to persist.
|
|
continue
|
|
|
|
invoice.received_lovelace = raw_lovelace
|
|
if tx_hash and tx_hash not in invoice.tx_hashes:
|
|
invoice.tx_hashes.append(tx_hash)
|
|
# record_tx is idempotent per contract — call it so the store
|
|
# can persist the per-utxo history however it wants.
|
|
try:
|
|
await store.record_tx(
|
|
invoice.id, tx_hash, lovelace_delta=raw_lovelace
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"[cardano-monitor] record_tx failed for %s/%s: %s",
|
|
invoice.id,
|
|
tx_hash[:12],
|
|
e,
|
|
)
|
|
|
|
if new_status != invoice.status:
|
|
old_status = invoice.status
|
|
invoice.status = new_status
|
|
|
|
if new_status in (InvoiceStatus.CONFIRMED, InvoiceStatus.OVERPAID):
|
|
invoice.confirmed_at = now
|
|
|
|
logger.info(
|
|
"[cardano-monitor] invoice=%s merchant=%s: %s -> %s "
|
|
"(%.6f ADA received, %.6f ADA total value)",
|
|
invoice.id,
|
|
invoice.merchant_id,
|
|
old_status.value,
|
|
new_status.value,
|
|
raw_lovelace / 1_000_000,
|
|
total_value / 1_000_000,
|
|
)
|
|
|
|
await store.update(invoice)
|
|
updates += 1
|
|
|
|
except Exception as e:
|
|
logger.exception(
|
|
"[cardano-monitor] Error checking invoice %s: %s", invoice.id, e
|
|
)
|
|
|
|
return updates
|
|
|
|
|
|
async def reprice_expired_invoices(
|
|
store: InvoiceStore,
|
|
*,
|
|
price_fn: PriceFn,
|
|
window_minutes: int = DEFAULT_PAYMENT_WINDOW_MINUTES,
|
|
max_repricings: int = DEFAULT_MAX_REPRICINGS,
|
|
limit: int = 100,
|
|
) -> int:
|
|
"""Reprice PENDING invoices whose expiry has passed.
|
|
|
|
Calls the consumer-supplied ``price_fn(usd_amount) -> lovelace`` to
|
|
recompute ``expected_lovelace`` at current market. Resets ``expires_at``
|
|
to ``now + window_minutes`` and increments ``invoice.metadata["repriced_count"]``.
|
|
After ``max_repricings`` the invoice transitions to
|
|
:class:`InvoiceStatus.EXPIRED`.
|
|
|
|
Args:
|
|
store: Persistence backend.
|
|
price_fn: Async callable that takes a USD amount and returns the
|
|
current lovelace equivalent. Example::
|
|
|
|
from cardano_checkout.monitor import reprice_expired_invoices
|
|
|
|
async def my_price_fn(usd: float) -> int:
|
|
rate = await coingecko_fetch_ada_usd()
|
|
return int(round(usd / rate * 1_000_000))
|
|
|
|
await reprice_expired_invoices(store, price_fn=my_price_fn)
|
|
window_minutes: New expiry window per reprice. Default 15.
|
|
max_repricings: Give-up threshold. Default 3.
|
|
limit: Max pending invoices to process per call.
|
|
|
|
Returns:
|
|
Number of invoices updated (repriced or expired) this cycle.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit)
|
|
if not pending:
|
|
return 0
|
|
|
|
expired_candidates = [
|
|
inv for inv in pending if inv.expires_at is not None and inv.expires_at <= now
|
|
]
|
|
if not expired_candidates:
|
|
return 0
|
|
|
|
logger.info(
|
|
"[cardano-monitor] Repricing %d expired invoice(s)", len(expired_candidates)
|
|
)
|
|
|
|
new_expires_at = now + timedelta(minutes=window_minutes)
|
|
updated = 0
|
|
|
|
for invoice in expired_candidates:
|
|
try:
|
|
repriced_count = int(invoice.metadata.get("repriced_count", 0))
|
|
|
|
if repriced_count >= max_repricings:
|
|
invoice.status = InvoiceStatus.EXPIRED
|
|
await store.update(invoice)
|
|
updated += 1
|
|
logger.info(
|
|
"[cardano-monitor] invoice %s expired after %d repricings",
|
|
invoice.id,
|
|
repriced_count,
|
|
)
|
|
continue
|
|
|
|
usd_amount = float(invoice.usd_amount or 0)
|
|
if usd_amount <= 0:
|
|
invoice.status = InvoiceStatus.EXPIRED
|
|
await store.update(invoice)
|
|
updated += 1
|
|
logger.warning(
|
|
"[cardano-monitor] invoice %s has no usd_amount — marking expired",
|
|
invoice.id,
|
|
)
|
|
continue
|
|
|
|
try:
|
|
new_lovelace = await price_fn(usd_amount)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"[cardano-monitor] invoice %s: price_fn raised %s, skipping",
|
|
invoice.id,
|
|
e,
|
|
)
|
|
continue
|
|
if new_lovelace <= 0:
|
|
logger.warning(
|
|
"[cardano-monitor] invoice %s: price_fn returned %d, skipping",
|
|
invoice.id,
|
|
new_lovelace,
|
|
)
|
|
continue
|
|
|
|
old_lovelace = invoice.expected_lovelace
|
|
invoice.expected_lovelace = new_lovelace
|
|
invoice.expires_at = new_expires_at
|
|
invoice.metadata["repriced_count"] = repriced_count + 1
|
|
|
|
await store.update(invoice)
|
|
updated += 1
|
|
|
|
logger.info(
|
|
"[cardano-monitor] Repriced invoice %s: %d -> %d lovelace (reprice #%d)",
|
|
invoice.id,
|
|
old_lovelace or 0,
|
|
new_lovelace,
|
|
repriced_count + 1,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.exception(
|
|
"[cardano-monitor] Error repricing invoice %s: %s", invoice.id, e
|
|
)
|
|
|
|
return updated
|