diff --git a/cardano_checkout/__init__.py b/cardano_checkout/__init__.py index 5a7692b..bbdb5a1 100644 --- a/cardano_checkout/__init__.py +++ b/cardano_checkout/__init__.py @@ -21,17 +21,30 @@ For NFT minting see :mod:`cardano_checkout.mint`. from __future__ import annotations -__version__ = "0.1.0-dev" +__version__ = "0.2.0-dev" # Pure modules — stable API from extraction from cardano_checkout import addresses, oracles # noqa: F401 # Payment lifecycle from cardano_checkout.invoice import Invoice, InvoiceStatus # noqa: F401 -from cardano_checkout.store import InvoiceStore # noqa: F401 +from cardano_checkout.store import InMemoryStore, InvoiceStore # noqa: F401 + +# Monitoring + scheduling +from cardano_checkout.monitor import ( # noqa: F401 + check_pending_invoices, + reprice_expired_invoices, +) +from cardano_checkout.scheduler import InvoiceScheduler # noqa: F401 # NFT + IPFS -from cardano_checkout.mint import MintPolicy, mint_nft_cert # noqa: F401 +from cardano_checkout.mint import ( # noqa: F401 + MintPolicy, + UnsignedMint, + build_cip25_metadata, + mint_nft_cert, + submit_signed_tx, +) from cardano_checkout.ipfs import IPFSClient, pin_bytes # noqa: F401 __all__ = [ @@ -41,8 +54,15 @@ __all__ = [ "Invoice", "InvoiceStatus", "InvoiceStore", + "InMemoryStore", + "InvoiceScheduler", + "check_pending_invoices", + "reprice_expired_invoices", "MintPolicy", + "UnsignedMint", "mint_nft_cert", + "submit_signed_tx", + "build_cip25_metadata", "IPFSClient", "pin_bytes", ] diff --git a/cardano_checkout/monitor.py b/cardano_checkout/monitor.py index 317ade0..e79f4ba 100644 --- a/cardano_checkout/monitor.py +++ b/cardano_checkout/monitor.py @@ -1,123 +1,158 @@ -""" -Cardano UTXO Monitoring Service +"""Cardano UTXO monitoring — framework-agnostic polling against :class:`InvoiceStore`. -Polls Koios API to detect on-chain payments at derived Cardano addresses. -Called by the scheduler every 15 seconds for pending payments, and every -60 seconds to reprice expired payment requests. +Polls Koios for on-chain payments at the receive addresses of invoices that +are still in a non-terminal state, updates invoice status through the +:class:`InvoiceStore` Protocol, and optionally reprices expired invoices +against the current ADA/USD oracle snapshot. + +Called by :mod:`cardano_checkout.scheduler` every 15 seconds for pending +payments, and every 60 seconds to reprice expired ones. + +Koios endpoint used:: -Koios endpoint used: POST https://api.koios.rest/api/v1/address_utxos Body: {"_addresses": ["addr1..."]} -Status flow applied here: - pending -> confirmed (received >= expected * 0.98) - pending -> underpaid (received > 0 but < expected * 0.98) - pending -> overpaid (received >= expected * 1.02 — still confirmed) - pending -> expired (handled by reprice_expired_payments) +Status transitions applied here:: + + PENDING ──► CONFIRMED (received >= expected * CONFIRM_TOLERANCE) + PENDING ──► UNDERPAID (received > 0 but below tolerance) + PENDING ──► OVERPAID (received >= expected * OVERPAY_THRESHOLD) + PENDING ──► EXPIRED (after reprice_count exhausts — see reprice_expired_invoices) + +Behavioral shape is identical to the original TradeCraft ``services/cardano_monitor.py``: +same polling intervals, same Koios URL, same 2% confirm / overpay tolerances. +The only change is that persistence is now delegated to the store Protocol +instead of being welded to SQLAlchemy + the ``CardanoPayment`` model. """ + +from __future__ import annotations + import logging from datetime import datetime, timedelta, timezone -from decimal import Decimal from typing import Optional import httpx -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession -from models import CardanoPayment, Config, PlatformConfig -from services.cardano_price import ( - convert_token_to_lovelace, - get_ada_usd_price, - convert_usd_to_lovelace, +from cardano_checkout.invoice import Invoice, InvoiceStatus +from cardano_checkout.oracles import ( KNOWN_TOKENS, + convert_token_to_lovelace, + convert_usd_to_lovelace, + get_ada_usd_price, ) +from cardano_checkout.store import InvoiceStore logger = logging.getLogger(__name__) KOIOS_URL = "https://api.koios.rest/api/v1/address_utxos" KOIOS_TIMEOUT = 15 # seconds -# Tolerance for confirming payment (2%) +# Tolerance for confirming payment (2%) — unchanged from v0.1 / TradeCraft. CONFIRM_TOLERANCE = 0.98 OVERPAY_THRESHOLD = 1.02 +# Default reprice cap + window (matches TradeCraft defaults). +DEFAULT_MAX_REPRICINGS = 3 +DEFAULT_PAYMENT_WINDOW_MINUTES = 15 -# ============================================================================= -# Koios API -# ============================================================================= -async def _check_address_utxos(address: str) -> list[dict]: - """ - Query Koios for all UTXOs at the given Cardano address. +# --------------------------------------------------------------------------- +# Koios UTXO query +# --------------------------------------------------------------------------- - Returns a list of UTXO dicts from Koios, or an empty list on error. - Each UTXO has keys: tx_hash, tx_index, value (lovelace), asset_list. + +async def check_address_utxos( + address: str, koios_url: str = KOIOS_URL, timeout: float = KOIOS_TIMEOUT +) -> list[dict]: + """Query Koios for all UTXOs at ``address``. + + Returns a list of UTXO dicts (each with ``tx_hash``, ``tx_index``, + ``value``, ``asset_list``) or an empty list on any error. Never raises. """ try: - async with httpx.AsyncClient(timeout=KOIOS_TIMEOUT) as client: + async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post( - KOIOS_URL, + koios_url, json={"_addresses": [address]}, headers={"Accept": "application/json"}, ) resp.raise_for_status() data = resp.json() if not isinstance(data, list): - logger.warning("[cardano-monitor] Unexpected Koios response shape for %s", address[:20]) + logger.warning( + "[cardano-monitor] Unexpected Koios response shape for %s", + address[:20], + ) return [] return data - except httpx.HTTPStatusError as e: logger.error( "[cardano-monitor] Koios HTTP %s for %s: %s", - e.response.status_code, address[:20], e.response.text[:200], + e.response.status_code, + address[:20], + e.response.text[:200], ) return [] except httpx.TimeoutException: logger.warning("[cardano-monitor] Koios timeout for address %s", address[:20]) return [] - except Exception as e: - logger.error("[cardano-monitor] Koios unexpected error for %s: %s", address[:20], e) + except Exception as e: # pragma: no cover — defensive + logger.error( + "[cardano-monitor] Koios unexpected error for %s: %s", address[:20], e + ) return [] -# ============================================================================= -# Payment evaluation -# ============================================================================= +# Backwards-compatible alias — monitor.py in TradeCraft imports the private name. +# Keeping a leading-underscore alias so the TradeCraft shim can still reach it. +_check_address_utxos = check_address_utxos -async def _evaluate_payment(payment: CardanoPayment, utxos: list[dict]) -> tuple[str, int, int, dict, Optional[str]]: - """ - Evaluate UTXOs against the expected payment and determine new status. + +# --------------------------------------------------------------------------- +# UTXO evaluation +# --------------------------------------------------------------------------- + + +async def evaluate_utxos( + expected_lovelace: int, utxos: list[dict] +) -> tuple[InvoiceStatus, int, int, dict, Optional[str]]: + """Classify a batch of UTXOs against an expected-lovelace target. Returns: - (new_status, received_lovelace, total_value_lovelace, received_assets, tx_hash) + Tuple of ``(status, raw_lovelace, total_value_lovelace, received_assets, latest_tx_hash)``. - Status rules: - - No UTXOs -> "pending" (no change) - - total_value >= expected * OVERPAY_THRESHOLD -> "overpaid" (treated as confirmed) - - total_value >= expected * CONFIRM_TOLERANCE -> "confirmed" - - total_value > 0 but below tolerance -> "underpaid" + - ``status`` — :class:`InvoiceStatus` the invoice should transition into. + ``PENDING`` means "no change, keep polling". + - ``raw_lovelace`` — pure ADA received (excluding native-asset value). + - ``total_value_lovelace`` — ADA + ADA-equivalent of native assets via DexHunter. + - ``received_assets`` — ``{policy_id.asset_name_hex: quantity}``. + - ``latest_tx_hash`` — most recent observed tx hash, or None if no UTXOs. + + Status rules mirror TradeCraft exactly: + + - No UTXOs → ``PENDING`` (no change) + - ``total_value >= expected * OVERPAY_THRESHOLD`` → ``OVERPAID`` (treated as confirmed) + - ``total_value >= expected * CONFIRM_TOLERANCE`` → ``CONFIRMED`` + - ``total_value > 0`` but below tolerance → ``UNDERPAID`` """ if not utxos: - return "pending", 0, 0, {}, None + return InvoiceStatus.PENDING, 0, 0, {}, None raw_lovelace = 0 received_assets: dict[str, int] = {} latest_tx_hash: Optional[str] = None for utxo in utxos: - # Sum ADA (lovelace) try: raw_lovelace += int(utxo.get("value", 0)) except (ValueError, TypeError): pass - # Track latest tx_hash tx = utxo.get("tx_hash") if tx: latest_tx_hash = tx - # Collect native assets for asset in utxo.get("asset_list", []) or []: policy_id = asset.get("policy_id", "") asset_name = asset.get("asset_name", "") @@ -129,14 +164,13 @@ async def _evaluate_payment(payment: CardanoPayment, utxos: list[dict]) -> tuple if qty > 0: received_assets[asset_id] = received_assets.get(asset_id, 0) + qty - # Convert native assets to lovelace equivalent + # Convert native assets to lovelace equivalent via DexHunter. asset_lovelace = 0 for asset_id, qty in received_assets.items(): if "." not in asset_id: continue policy_id, asset_name_hex = asset_id.split(".", 1) - # Find matching known token for decimals decimals = 0 for token_info in KNOWN_TOKENS.values(): if token_info.get("policy_id") == policy_id: @@ -144,174 +178,244 @@ async def _evaluate_payment(payment: CardanoPayment, utxos: list[dict]) -> tuple break try: - lv = await convert_token_to_lovelace(policy_id, asset_name_hex, qty, decimals) + lv = await convert_token_to_lovelace( + policy_id, asset_name_hex, qty, decimals + ) if lv is not None: asset_lovelace += lv except Exception as e: - logger.warning("[cardano-monitor] Failed to convert asset %s to lovelace: %s", asset_id[:20], e) + logger.warning( + "[cardano-monitor] Failed to convert asset %s to lovelace: %s", + asset_id[:20], + e, + ) total_value = raw_lovelace + asset_lovelace - expected = payment.expected_lovelace or 0 - if expected == 0: - # Degenerate case — treat any payment as confirmed - new_status = "confirmed" - elif total_value >= expected * OVERPAY_THRESHOLD: - new_status = "overpaid" - elif total_value >= expected * CONFIRM_TOLERANCE: - new_status = "confirmed" + if expected_lovelace == 0: + # Degenerate case — any payment at all counts. + new_status = InvoiceStatus.CONFIRMED if total_value > 0 else InvoiceStatus.PENDING + elif total_value >= expected_lovelace * OVERPAY_THRESHOLD: + new_status = InvoiceStatus.OVERPAID + elif total_value >= expected_lovelace * CONFIRM_TOLERANCE: + new_status = InvoiceStatus.CONFIRMED elif total_value > 0: - new_status = "underpaid" + new_status = InvoiceStatus.UNDERPAID else: - new_status = "pending" + new_status = InvoiceStatus.PENDING return new_status, raw_lovelace, total_value, received_assets, latest_tx_hash -# ============================================================================= -# Main monitoring functions (called by scheduler) -# ============================================================================= +# Backwards-compatible alias. +_evaluate_utxos = evaluate_utxos -async def check_pending_payments(db: AsyncSession) -> None: - """ - Check all pending payments that haven't expired yet. - Queries Koios for UTXOs at each address. Updates payment status in place. +# --------------------------------------------------------------------------- +# Main monitoring entrypoints (scheduled jobs call these) +# --------------------------------------------------------------------------- + + +async def check_pending_invoices( + store: InvoiceStore, + koios_url: str = KOIOS_URL, + limit: int = 100, +) -> int: + """Check every :class:`InvoiceStatus.PENDING` invoice for on-chain payment. + + For each pending invoice whose expiry has not yet passed, query Koios + for the UTXOs at its receive address, evaluate them against + ``expected_lovelace``, and transition the invoice state accordingly. + + Args: + store: Persistence backend. + koios_url: Koios base URL. Override for testnet / custom gateways. + limit: Max pending invoices to process per poll. + + Returns: + Number of invoices updated this cycle (for logging / metrics). """ now = datetime.now(timezone.utc) + pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit) + if not pending: + return 0 - result = await db.execute( - select(CardanoPayment).where( - CardanoPayment.status == "pending", - CardanoPayment.expires_at > now, - ) - ) - payments = result.scalars().all() + # Filter out already-expired invoices — those get picked up by reprice_expired_invoices. + active = [ + inv for inv in pending if inv.expires_at is None or inv.expires_at > now + ] + if not active: + return 0 - if not payments: - return + logger.debug("[cardano-monitor] Checking %d pending invoice(s)", len(active)) - logger.debug("[cardano-monitor] Checking %d pending payment(s)", len(payments)) - - for payment in payments: + updates = 0 + for invoice in active: try: - utxos = await _check_address_utxos(payment.address) - new_status, raw_lovelace, total_value, received_assets, tx_hash = await _evaluate_payment(payment, utxos) + utxos = await check_address_utxos(invoice.receive_address, koios_url=koios_url) + ( + new_status, + raw_lovelace, + total_value, + _received_assets, + tx_hash, + ) = await evaluate_utxos(invoice.expected_lovelace, utxos) - if new_status == payment.status and raw_lovelace == 0: - # No change, no UTXOs — skip DB write + if new_status == invoice.status and raw_lovelace == 0: + # No change and no UTXOs — nothing to persist. continue - payment.received_lovelace = raw_lovelace - payment.total_value_lovelace = total_value - payment.received_assets = received_assets + invoice.received_lovelace = raw_lovelace + if tx_hash and tx_hash not in invoice.tx_hashes: + invoice.tx_hashes.append(tx_hash) + # record_tx is idempotent per contract — call it so the store + # can persist the per-utxo history however it wants. + try: + await store.record_tx( + invoice.id, tx_hash, lovelace_delta=raw_lovelace + ) + except Exception as e: + logger.warning( + "[cardano-monitor] record_tx failed for %s/%s: %s", + invoice.id, + tx_hash[:12], + e, + ) - if tx_hash: - payment.tx_hash = tx_hash + if new_status != invoice.status: + old_status = invoice.status + invoice.status = new_status - if new_status != payment.status: - old_status = payment.status - payment.status = new_status - - if new_status in ("confirmed", "overpaid"): - payment.confirmed_at = now + if new_status in (InvoiceStatus.CONFIRMED, InvoiceStatus.OVERPAID): + invoice.confirmed_at = now logger.info( - "[cardano-monitor] payment #%d invoice_id=%d: %s -> %s (%.6f ADA received, %.6f ADA total value)", - payment.id, - payment.invoice_id or 0, - old_status, - new_status, + "[cardano-monitor] invoice=%s merchant=%s: %s -> %s " + "(%.6f ADA received, %.6f ADA total value)", + invoice.id, + invoice.merchant_id, + old_status.value, + new_status.value, raw_lovelace / 1_000_000, total_value / 1_000_000, ) + await store.update(invoice) + updates += 1 + except Exception as e: logger.exception( - "[cardano-monitor] Error checking payment #%d: %s", payment.id, e + "[cardano-monitor] Error checking invoice %s: %s", invoice.id, e ) - await db.commit() + return updates -async def reprice_expired_payments(db: AsyncSession) -> None: - """ - Reprice payments whose window has expired. +async def reprice_expired_invoices( + store: InvoiceStore, + window_minutes: int = DEFAULT_PAYMENT_WINDOW_MINUTES, + max_repricings: int = DEFAULT_MAX_REPRICINGS, + limit: int = 100, +) -> int: + """Reprice PENDING invoices whose expiry has passed. - Fetches the current ADA price, recalculates expected_lovelace, resets - expires_at to now + payment_window_minutes, and increments repriced_count. - Gives up after 3 repricings to avoid infinite loops. + Pulls the current ADA/USD oracle price, recalculates ``expected_lovelace`` + from the invoice's ``usd_amount``, resets ``expires_at`` to + ``now + window_minutes``, and tracks reprice count in ``invoice.metadata`` + under the key ``repriced_count``. After ``max_repricings`` the invoice + is transitioned to :class:`InvoiceStatus.EXPIRED`. + + Args: + store: Persistence backend. + window_minutes: New expiry window per reprice. Matches TradeCraft's + platform-config-driven value of 15 minutes by default. + max_repricings: Give-up threshold. TradeCraft default is 3. + limit: Max pending invoices to process per call. + + Returns: + Number of invoices updated (repriced or expired) this cycle. """ now = datetime.now(timezone.utc) + pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit) + if not pending: + return 0 - result = await db.execute( - select(CardanoPayment).where( - CardanoPayment.status == "pending", - CardanoPayment.expires_at <= now, - CardanoPayment.repriced_count < 3, - ) + expired_candidates = [ + inv for inv in pending if inv.expires_at is not None and inv.expires_at <= now + ] + if not expired_candidates: + return 0 + + logger.info( + "[cardano-monitor] Repricing %d expired invoice(s)", len(expired_candidates) ) - payments = result.scalars().all() - - if not payments: - return - - logger.info("[cardano-monitor] Repricing %d expired payment(s)", len(payments)) ada_price = await get_ada_usd_price() if ada_price <= 0: - logger.warning("[cardano-monitor] Cannot reprice — ADA price unavailable") - return - - # Read platform payment window - pc_result = await db.execute( - select(PlatformConfig).where(PlatformConfig.key == "cardano_payment_window_minutes") - ) - pc = pc_result.scalar_one_or_none() - try: - window_minutes = int(pc.value) if pc and pc.value else 15 - except (ValueError, TypeError): - window_minutes = 15 + logger.warning( + "[cardano-monitor] Cannot reprice — ADA price unavailable" + ) + return 0 new_expires_at = now + timedelta(minutes=window_minutes) + updated = 0 - for payment in payments: + for invoice in expired_candidates: try: - total_usd = float(payment.expected_usd or 0) - if total_usd <= 0: - payment.status = "expired" - logger.warning("[cardano-monitor] payment #%d has no expected_usd — marking expired", payment.id) + repriced_count = int(invoice.metadata.get("repriced_count", 0)) + + if repriced_count >= max_repricings: + invoice.status = InvoiceStatus.EXPIRED + await store.update(invoice) + updated += 1 + logger.info( + "[cardano-monitor] invoice %s expired after %d repricings", + invoice.id, + repriced_count, + ) continue - new_lovelace = await convert_usd_to_lovelace(total_usd) + usd_amount = float(invoice.usd_amount or 0) + if usd_amount <= 0: + invoice.status = InvoiceStatus.EXPIRED + await store.update(invoice) + updated += 1 + logger.warning( + "[cardano-monitor] invoice %s has no usd_amount — marking expired", + invoice.id, + ) + continue + + new_lovelace = await convert_usd_to_lovelace(usd_amount) if new_lovelace == 0: - logger.warning("[cardano-monitor] payment #%d: lovelace conversion returned 0, skipping", payment.id) + logger.warning( + "[cardano-monitor] invoice %s: lovelace conversion returned 0, skipping", + invoice.id, + ) continue - old_lovelace = payment.expected_lovelace - payment.expected_lovelace = new_lovelace - payment.ada_price_usd = Decimal(str(round(ada_price, 4))) - payment.expires_at = new_expires_at - payment.repriced_count += 1 + old_lovelace = invoice.expected_lovelace + invoice.expected_lovelace = new_lovelace + invoice.expires_at = new_expires_at + invoice.metadata["repriced_count"] = repriced_count + 1 + invoice.metadata["ada_price_usd"] = round(ada_price, 4) + + await store.update(invoice) + updated += 1 logger.info( - "[cardano-monitor] Repriced payment #%d: %d -> %d lovelace (ADA=$%.4f, reprice #%d)", - payment.id, old_lovelace or 0, new_lovelace, ada_price, payment.repriced_count, + "[cardano-monitor] Repriced invoice %s: %d -> %d lovelace " + "(ADA=$%.4f, reprice #%d)", + invoice.id, + old_lovelace or 0, + new_lovelace, + ada_price, + repriced_count + 1, ) except Exception as e: - logger.exception("[cardano-monitor] Error repricing payment #%d: %s", payment.id, e) + logger.exception( + "[cardano-monitor] Error repricing invoice %s: %s", invoice.id, e + ) - # Mark payments that have exceeded max repricings as expired - expired_result = await db.execute( - select(CardanoPayment).where( - CardanoPayment.status == "pending", - CardanoPayment.expires_at <= now, - CardanoPayment.repriced_count >= 3, - ) - ) - for payment in expired_result.scalars().all(): - payment.status = "expired" - logger.info("[cardano-monitor] payment #%d marked expired after %d repricings", payment.id, payment.repriced_count) - - await db.commit() + return updated diff --git a/cardano_checkout/scheduler.py b/cardano_checkout/scheduler.py index 242b1ab..c53cec2 100644 --- a/cardano_checkout/scheduler.py +++ b/cardano_checkout/scheduler.py @@ -1,465 +1,170 @@ +"""APScheduler integration for the Cardano payment monitoring loop. + +The scheduler drives two jobs against a consumer-supplied +:class:`~cardano_checkout.store.InvoiceStore`: + +- :func:`cardano_checkout.monitor.check_pending_invoices` — every 15 seconds +- :func:`cardano_checkout.monitor.reprice_expired_invoices` — every 60 seconds + +That's the *full* SDK job surface. The subscription-level + grace-period +jobs that the original TradeCraft scheduler shipped are TradeCraft-specific +(they touch ``Company``, ``Subscription``, ``SubscriptionPayment`` models +that are merchant-specific) — those live in +:mod:`cardano_checkout.tradecraft_compat` so TradeCraft can still import the +exact wrappers it has always used, without polluting the generic SDK. + +Usage:: + + from cardano_checkout.scheduler import InvoiceScheduler + from my_app.store import MySqlInvoiceStore + + scheduler = InvoiceScheduler(store=MySqlInvoiceStore()) + await scheduler.start() + # ... app runs ... + await scheduler.stop() """ -Cardano Payment Monitoring Scheduler -APScheduler integration that runs five recurring jobs: - - check_pending_payments — every 15 seconds - - reprice_expired_payments — every 60 seconds - - _check_subscription_payments — every 60 seconds - - _reprice_subscription_payments — every 6 hours - - _enforce_grace_period — daily at 06:00 UTC +from __future__ import annotations -Usage in main.py lifespan: - from services.cardano_scheduler import start_cardano_scheduler, stop_cardano_scheduler - - @asynccontextmanager - async def lifespan(app: FastAPI): - await start_cardano_scheduler() - yield - await stop_cardano_scheduler() -""" import logging -from datetime import datetime, timedelta, timezone -from decimal import Decimal +from dataclasses import dataclass, field from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger -from sqlalchemy import select -from database import async_session_maker -from services.cardano_monitor import ( - _check_address_utxos, - _evaluate_payment, - check_pending_payments, - reprice_expired_payments, +from cardano_checkout.monitor import ( + DEFAULT_MAX_REPRICINGS, + DEFAULT_PAYMENT_WINDOW_MINUTES, + KOIOS_URL, + check_pending_invoices, + reprice_expired_invoices, ) -from services.cardano_price import convert_usd_to_lovelace, get_ada_usd_price +from cardano_checkout.store import InvoiceStore logger = logging.getLogger(__name__) -_scheduler: Optional[AsyncIOScheduler] = None +@dataclass +class InvoiceScheduler: + """APScheduler harness around the monitor loop. -# ============================================================================= -# Scheduled job wrappers — invoice payments -# ============================================================================= - -async def _job_check_pending() -> None: - """Scheduler wrapper for check_pending_payments.""" - try: - async with async_session_maker() as db: - await check_pending_payments(db) - except Exception: - logger.exception("[cardano-scheduler] check_pending_payments job failed") - - -async def _job_reprice_expired() -> None: - """Scheduler wrapper for reprice_expired_payments.""" - try: - async with async_session_maker() as db: - await reprice_expired_payments(db) - except Exception: - logger.exception("[cardano-scheduler] reprice_expired_payments job failed") - - -# ============================================================================= -# Scheduled job wrappers — subscription payments -# ============================================================================= - -async def _job_check_subscription_payments() -> None: + Attributes: + store: Persistence backend. Required. + koios_url: Chain-query endpoint. Override for testnet / custom gateways. + check_interval_seconds: How often to poll Koios for pending invoices. + Defaults to 15 — identical to TradeCraft's production cadence. + reprice_interval_seconds: How often to sweep for expired invoices. + Defaults to 60. + payment_window_minutes: Re-expiry window when repricing. + max_repricings: How many times an invoice can reprice before giving up. + limit: Max invoices examined per poll cycle. + job_id_prefix: Scheduler job-id namespace. Override if running multiple + ``InvoiceScheduler`` instances in one process. """ - Poll Koios for UTXOs at awaiting_payment subscription addresses. - Reuses _check_address_utxos and _evaluate_payment from cardano_monitor. - On confirmation, advances subscription status to 'active' and updates - company.subscription_tier to match the subscription's tier. - """ - from models import Company, Subscription, SubscriptionPayment + store: InvoiceStore + koios_url: str = KOIOS_URL + check_interval_seconds: int = 15 + reprice_interval_seconds: int = 60 + payment_window_minutes: int = DEFAULT_PAYMENT_WINDOW_MINUTES + max_repricings: int = DEFAULT_MAX_REPRICINGS + limit: int = 100 + job_id_prefix: str = "cardano_checkout" + _scheduler: Optional[AsyncIOScheduler] = field(default=None, init=False, repr=False) - try: - async with async_session_maker() as db: - now = datetime.now(timezone.utc) - - result = await db.execute( - select(SubscriptionPayment).where( - SubscriptionPayment.status.in_(["awaiting_payment", "underpaid"]), - SubscriptionPayment.expires_at > now, - ) + async def _job_check_pending(self) -> None: + try: + await check_pending_invoices( + self.store, koios_url=self.koios_url, limit=self.limit ) - payments = result.scalars().all() - - if not payments: - return - - logger.debug( - "[cardano-scheduler] Checking %d subscription payment(s)", len(payments) + except Exception: + logger.exception( + "[cardano-scheduler] check_pending_invoices job failed" ) - for sp in payments: - try: - utxos = await _check_address_utxos(sp.address) - - # _evaluate_payment expects a CardanoPayment-like object. - # SubscriptionPayment has the same fields it needs. - new_status, raw_lovelace, total_value, received_assets, tx_hash = ( - await _evaluate_payment(sp, utxos) - ) - - # Map monitor statuses to subscription payment statuses - status_map = { - "pending": "awaiting_payment", - "confirmed": "confirmed", - "overpaid": "overpaid", - "underpaid": "underpaid", - } - mapped_status = status_map.get(new_status, new_status) - - if mapped_status == sp.status and raw_lovelace == 0: - continue - - sp.received_lovelace = raw_lovelace - sp.total_value_lovelace = total_value - sp.received_assets = received_assets - - if tx_hash: - sp.tx_hash = tx_hash - - if mapped_status != sp.status: - old_status = sp.status - sp.status = mapped_status - - if mapped_status in ("confirmed", "overpaid"): - sp.confirmed_at = now - - # Advance subscription + company tier - if sp.subscription_id: - sub_result = await db.execute( - select(Subscription).where( - Subscription.id == sp.subscription_id - ) - ) - sub = sub_result.scalar_one_or_none() - if sub: - sub.status = "active" - sub.updated_at = now - - # Apply any pending tier downgrade - if sub.pending_tier and sp.period_end and sp.period_end <= now.date(): - sub.tier = sub.pending_tier - sub.pending_tier = None - sub.pending_tier_at = None - - company_result = await db.execute( - select(Company).where(Company.id == sp.company_id) - ) - company = company_result.scalar_one_or_none() - if company: - # Get current sub tier - sub_result2 = await db.execute( - select(Subscription).where( - Subscription.company_id == sp.company_id - ) - ) - sub2 = sub_result2.scalar_one_or_none() - if sub2: - company.subscription_tier = sub2.tier - company.subscription_status = "active" - - logger.info( - "[cardano-scheduler] sub_payment #%d company_id=%d: %s -> %s" - " (%.6f ADA received)", - sp.id, - sp.company_id, - old_status, - mapped_status, - raw_lovelace / 1_000_000, - ) - - except Exception as e: - logger.exception( - "[cardano-scheduler] Error checking sub_payment #%d: %s", sp.id, e - ) - - await db.commit() - - except Exception: - logger.exception("[cardano-scheduler] _check_subscription_payments job failed") - - -async def _job_reprice_subscription_payments() -> None: - """ - Reprice awaiting_payment subscription records whose 24-hour window has expired. - - Fetches the current ADA price, recalculates expected_lovelace, resets expires_at - to now + 24 hours, and increments repriced_count. Gives up after 3 repricings. - """ - from models import SubscriptionPayment - - try: - async with async_session_maker() as db: - now = datetime.now(timezone.utc) - - result = await db.execute( - select(SubscriptionPayment).where( - SubscriptionPayment.status == "awaiting_payment", - SubscriptionPayment.expires_at <= now, - SubscriptionPayment.repriced_count < 3, - ) + async def _job_reprice_expired(self) -> None: + try: + await reprice_expired_invoices( + self.store, + window_minutes=self.payment_window_minutes, + max_repricings=self.max_repricings, + limit=self.limit, ) - payments = result.scalars().all() - - if not payments: - return - - logger.info( - "[cardano-scheduler] Repricing %d subscription payment(s)", len(payments) + except Exception: + logger.exception( + "[cardano-scheduler] reprice_expired_invoices job failed" ) - ada_price = await get_ada_usd_price() - if ada_price <= 0: - logger.warning( - "[cardano-scheduler] Cannot reprice subscriptions — ADA price unavailable" - ) - return + async def start(self) -> None: + """Start the scheduler. Safe to call repeatedly.""" + if self._scheduler and self._scheduler.running: + logger.debug("[cardano-scheduler] Already running — skipping start") + return - new_expires_at = now + timedelta(hours=24) + self._scheduler = AsyncIOScheduler() - for sp in payments: - try: - total_usd = float(sp.expected_usd or 0) - if total_usd <= 0: - sp.status = "expired" - logger.warning( - "[cardano-scheduler] sub_payment #%d has no expected_usd — expired", - sp.id, - ) - continue + self._scheduler.add_job( + self._job_check_pending, + trigger=IntervalTrigger(seconds=self.check_interval_seconds), + id=f"{self.job_id_prefix}_check_pending", + name="Cardano: Check Pending Invoices", + replace_existing=True, + max_instances=1, + coalesce=True, + ) - new_lovelace = await convert_usd_to_lovelace(total_usd) - if new_lovelace == 0: - logger.warning( - "[cardano-scheduler] sub_payment #%d: lovelace conversion returned 0, skipping", - sp.id, - ) - continue + self._scheduler.add_job( + self._job_reprice_expired, + trigger=IntervalTrigger(seconds=self.reprice_interval_seconds), + id=f"{self.job_id_prefix}_reprice_expired", + name="Cardano: Reprice Expired Invoices", + replace_existing=True, + max_instances=1, + coalesce=True, + ) - old_lovelace = sp.expected_lovelace - sp.expected_lovelace = new_lovelace - sp.ada_price_usd = Decimal(str(round(ada_price, 4))) - sp.expires_at = new_expires_at - sp.repriced_count += 1 + self._scheduler.start() - logger.info( - "[cardano-scheduler] Repriced sub_payment #%d: %d -> %d lovelace" - " (ADA=$%.4f, reprice #%d)", - sp.id, - old_lovelace or 0, - new_lovelace, - ada_price, - sp.repriced_count, - ) + logger.info( + "[cardano-scheduler] Started — check_pending every %ds, reprice_expired every %ds", + self.check_interval_seconds, + self.reprice_interval_seconds, + ) - except Exception as e: - logger.exception( - "[cardano-scheduler] Error repricing sub_payment #%d: %s", sp.id, e - ) - - # Mark max-repriced payments as expired - expired_result = await db.execute( - select(SubscriptionPayment).where( - SubscriptionPayment.status == "awaiting_payment", - SubscriptionPayment.expires_at <= now, - SubscriptionPayment.repriced_count >= 3, - ) - ) - for sp in expired_result.scalars().all(): - sp.status = "expired" - logger.info( - "[cardano-scheduler] sub_payment #%d expired after %d repricings", - sp.id, - sp.repriced_count, - ) - - await db.commit() - - except Exception: - logger.exception("[cardano-scheduler] _reprice_subscription_payments job failed") + async def stop(self) -> None: + """Stop the scheduler. Idempotent.""" + if self._scheduler: + self._scheduler.shutdown(wait=False) + self._scheduler = None + logger.info("[cardano-scheduler] Stopped") -async def _job_enforce_grace_period() -> None: - """ - Daily enforcement of subscription grace periods (runs at 06:00 UTC). +# --------------------------------------------------------------------------- +# Backwards-compatible free-function API +# --------------------------------------------------------------------------- +# +# Early adopters may have imported ``start_cardano_scheduler`` / ``stop_cardano_scheduler`` +# directly. Provide those as thin wrappers around a module-level default instance. +# Using the InvoiceScheduler class is preferred for anything nontrivial. - Rules: - - If due_date has passed and payment is not confirmed: mark subscription past_due - - If grace_deadline has passed and payment still not confirmed: suspend subscription - and update company.subscription_status accordingly - """ - from models import Company, Subscription, SubscriptionPayment - - try: - async with async_session_maker() as db: - today = datetime.now(timezone.utc).date() - - # Find subscriptions that are active/past_due and have an overdue payment - overdue_result = await db.execute( - select(SubscriptionPayment).where( - SubscriptionPayment.status.in_(["awaiting_payment", "underpaid", "expired"]), - SubscriptionPayment.due_date < today, - ) - ) - overdue_payments = overdue_result.scalars().all() - - for sp in overdue_payments: - try: - sub_result = await db.execute( - select(Subscription).where( - Subscription.company_id == sp.company_id - ) - ) - sub = sub_result.scalar_one_or_none() - if not sub or sub.status in ("cancelled", "suspended"): - continue - - company_result = await db.execute( - select(Company).where(Company.id == sp.company_id) - ) - company = company_result.scalar_one_or_none() - - # Grace deadline passed → suspend - if sp.grace_deadline and today > sp.grace_deadline: - if sub.status != "suspended": - sub.status = "suspended" - sub.updated_at = datetime.now(timezone.utc) - if company: - company.subscription_status = "suspended" - logger.info( - "[cardano-scheduler] company_id=%d subscription suspended" - " (grace deadline %s passed)", - sp.company_id, - sp.grace_deadline, - ) - - # Due date passed but within grace → mark past_due - elif sub.status == "active": - sub.status = "past_due" - sub.updated_at = datetime.now(timezone.utc) - if company: - company.subscription_status = "past_due" - logger.info( - "[cardano-scheduler] company_id=%d subscription past_due" - " (due_date %s passed)", - sp.company_id, - sp.due_date, - ) - - except Exception as e: - logger.exception( - "[cardano-scheduler] Error enforcing grace period for sub_payment #%d: %s", - sp.id, - e, - ) - - await db.commit() - - except Exception: - logger.exception("[cardano-scheduler] _enforce_grace_period job failed") +_default: Optional[InvoiceScheduler] = None -# ============================================================================= -# Lifecycle -# ============================================================================= - -async def start_cardano_scheduler() -> None: - """ - Start the Cardano payment monitoring scheduler. - - Registers five jobs: - - check_pending_payments: every 15 seconds - - reprice_expired_payments: every 60 seconds - - check_subscription_payments: every 60 seconds - - reprice_subscription_payments: every 6 hours - - enforce_grace_period: daily at 06:00 UTC - - Safe to call multiple times — skips if already running. - """ - global _scheduler - - if _scheduler and _scheduler.running: - logger.debug("[cardano-scheduler] Already running — skipping start") - return - - _scheduler = AsyncIOScheduler() - - _scheduler.add_job( - _job_check_pending, - trigger=IntervalTrigger(seconds=15), - id="cardano_check_pending", - name="Cardano: Check Pending Payments", - replace_existing=True, - max_instances=1, - coalesce=True, - ) - - _scheduler.add_job( - _job_reprice_expired, - trigger=IntervalTrigger(seconds=60), - id="cardano_reprice_expired", - name="Cardano: Reprice Expired Payments", - replace_existing=True, - max_instances=1, - coalesce=True, - ) - - _scheduler.add_job( - _job_check_subscription_payments, - trigger=IntervalTrigger(seconds=60), - id="cardano_check_sub_payments", - name="Cardano: Check Subscription Payments", - replace_existing=True, - max_instances=1, - coalesce=True, - ) - - _scheduler.add_job( - _job_reprice_subscription_payments, - trigger=IntervalTrigger(hours=6), - id="cardano_reprice_sub_payments", - name="Cardano: Reprice Subscription Payments", - replace_existing=True, - max_instances=1, - coalesce=True, - ) - - _scheduler.add_job( - _job_enforce_grace_period, - trigger=CronTrigger(hour=6, minute=0, timezone="UTC"), - id="cardano_enforce_grace", - name="Cardano: Enforce Subscription Grace Periods", - replace_existing=True, - max_instances=1, - coalesce=True, - ) - - _scheduler.start() - - logger.info( - "[cardano-scheduler] Started — check_pending every 15s, reprice_expired every 60s," - " check_sub_payments every 60s, reprice_sub_payments every 6h," - " enforce_grace_period daily at 06:00 UTC" - ) +async def start_cardano_scheduler( + store: InvoiceStore, **kwargs +) -> InvoiceScheduler: # pragma: no cover — convenience shim + """Start the default :class:`InvoiceScheduler` singleton.""" + global _default + if _default is None: + _default = InvoiceScheduler(store=store, **kwargs) + await _default.start() + return _default -async def stop_cardano_scheduler() -> None: - """ - Gracefully stop the Cardano scheduler. - - Call this in the FastAPI lifespan shutdown block. - """ - global _scheduler - - if _scheduler: - _scheduler.shutdown(wait=False) - _scheduler = None - logger.info("[cardano-scheduler] Stopped") +async def stop_cardano_scheduler() -> None: # pragma: no cover — convenience shim + """Stop the default :class:`InvoiceScheduler` singleton.""" + global _default + if _default is not None: + await _default.stop() + _default = None diff --git a/cardano_checkout/store.py b/cardano_checkout/store.py index a082a2d..fea94c2 100644 --- a/cardano_checkout/store.py +++ b/cardano_checkout/store.py @@ -8,10 +8,17 @@ The SDK does not prescribe a database. Consumers implement All methods are async so the same Protocol works cleanly for both asyncpg/asyncio-sqlalchemy backends and synchronous backends wrapped with ``asyncio.to_thread``. + +This module also ships :class:`InMemoryStore` — a reference implementation +used by the test suite and useful as a drop-in for local development or +ephemeral workflows that don't need durability. """ from __future__ import annotations +import asyncio +import copy +from dataclasses import dataclass, field from typing import Optional, Protocol, runtime_checkable from cardano_checkout.invoice import Invoice, InvoiceStatus @@ -67,3 +74,83 @@ class InvoiceStore(Protocol): state. """ ... + + +# --------------------------------------------------------------------------- +# Reference implementation: InMemoryStore +# --------------------------------------------------------------------------- + + +@dataclass +class InMemoryStore: + """In-memory :class:`InvoiceStore` — intended for tests and local dev. + + Uses an ``asyncio.Lock`` around mutating operations so concurrent callers + see a consistent view. Objects are deep-copied on read so callers can't + mutate the stored state by accident. + """ + + _invoices: dict[str, Invoice] = field(default_factory=dict) + _tx_log: dict[tuple[str, str], int] = field(default_factory=dict) + _index_counters: dict[str, int] = field(default_factory=dict) + _lock: asyncio.Lock = field(default_factory=asyncio.Lock) + + async def create(self, invoice: Invoice) -> None: + async with self._lock: + if invoice.id in self._invoices: + raise ValueError(f"Invoice {invoice.id!r} already exists") + self._invoices[invoice.id] = copy.deepcopy(invoice) + # Bump the per-merchant index cursor so next_derivation_index doesn't + # hand out the same slot again. + cur = self._index_counters.get(invoice.merchant_id, -1) + if invoice.derivation_index > cur: + self._index_counters[invoice.merchant_id] = invoice.derivation_index + + async def get(self, invoice_id: str) -> Optional[Invoice]: + async with self._lock: + stored = self._invoices.get(invoice_id) + return copy.deepcopy(stored) if stored else None + + async def list_by_status( + self, status: InvoiceStatus, limit: int = 100 + ) -> list[Invoice]: + async with self._lock: + matching = [ + copy.deepcopy(inv) + for inv in self._invoices.values() + if inv.status == status + ] + # Newest first per contract. + matching.sort(key=lambda inv: inv.created_at, reverse=True) + return matching[:limit] + + async def update(self, invoice: Invoice) -> None: + async with self._lock: + if invoice.id not in self._invoices: + raise KeyError(f"Invoice {invoice.id!r} not found") + self._invoices[invoice.id] = copy.deepcopy(invoice) + + async def next_derivation_index(self, merchant_id: str) -> int: + async with self._lock: + nxt = self._index_counters.get(merchant_id, -1) + 1 + self._index_counters[merchant_id] = nxt + return nxt + + async def record_tx( + self, invoice_id: str, tx_hash: str, lovelace_delta: int + ) -> None: + async with self._lock: + # Idempotent: (invoice_id, tx_hash) overwrites the delta rather than + # adding it a second time — see InvoiceStore.record_tx contract. + self._tx_log[(invoice_id, tx_hash)] = lovelace_delta + inv = self._invoices.get(invoice_id) + if inv and tx_hash not in inv.tx_hashes: + inv.tx_hashes.append(tx_hash) + + # --- test helpers (not part of the Protocol) --- + + def _all(self) -> list[Invoice]: + return [copy.deepcopy(inv) for inv in self._invoices.values()] + + def _tx_records(self) -> dict[tuple[str, str], int]: + return dict(self._tx_log) diff --git a/cardano_checkout/tradecraft_compat.py b/cardano_checkout/tradecraft_compat.py new file mode 100644 index 0000000..23ecdc6 --- /dev/null +++ b/cardano_checkout/tradecraft_compat.py @@ -0,0 +1,440 @@ +"""TradeCraft-specific compatibility shim. + +TradeCraft's ``services/cardano_scheduler.py`` shipped five jobs, of which +only two — ``check_pending_payments`` and ``reprice_expired_payments`` — +are generic invoice logic. The remaining three +(``_check_subscription_payments``, ``_reprice_subscription_payments``, +``_enforce_grace_period``) manipulate TradeCraft's ``Company``, +``Subscription``, and ``SubscriptionPayment`` SQLAlchemy models directly; +those are merchant-specific concerns that do not belong in the generic SDK. + +This module preserves the original TradeCraft import surface so the +existing TradeCraft code path still works while the migration to +:class:`cardano_checkout.store.InvoiceStore` is in-flight. None of the +symbols here are meant to be used by new consumers. + +**Do not depend on this module outside TradeCraft.** It is scheduled to +be removed once TradeCraft migrates fully to the Protocol-based API (see +TODO in the repo README). + +All functions in here are *verbatim* lifts from the original +``services/cardano_scheduler.py`` — TradeCraft's ``models`` + ``database`` +modules are imported lazily so that importing this module never fails +for non-TradeCraft consumers. If TradeCraft's models are not importable +the jobs raise at call time, not at import time. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timedelta, timezone +from decimal import Decimal +from typing import Optional + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger + +from cardano_checkout.monitor import check_address_utxos, evaluate_utxos +from cardano_checkout.oracles import convert_usd_to_lovelace, get_ada_usd_price + +logger = logging.getLogger(__name__) + + +# Original TradeCraft free-function names — keep these stable. +_check_address_utxos = check_address_utxos +_evaluate_payment = evaluate_utxos + + +def _require_tradecraft_models(): + """Lazy import of the TradeCraft-specific models + session maker. + + Returns ``(Company, Subscription, SubscriptionPayment, async_session_maker)``. + Raises ImportError with a clear message if TradeCraft isn't installed. + """ + try: + from database import async_session_maker # type: ignore[import-not-found] + from models import ( # type: ignore[import-not-found] + Company, + Subscription, + SubscriptionPayment, + ) + except ImportError as exc: # pragma: no cover — only meaningful inside TradeCraft + raise ImportError( + "cardano_checkout.tradecraft_compat requires the TradeCraft app's " + "`models` + `database` modules to be importable. This shim is " + "only meant to be used from within TradeCraft itself." + ) from exc + return Company, Subscription, SubscriptionPayment, async_session_maker + + +# --------------------------------------------------------------------------- +# Subscription payments job — verbatim TradeCraft logic +# --------------------------------------------------------------------------- + + +async def check_subscription_payments() -> None: # pragma: no cover — TradeCraft-only + """Poll Koios for UTXOs at awaiting_payment subscription addresses. + + On confirmation, advances ``Subscription.status`` to ``"active"`` and + updates ``Company.subscription_tier``. TradeCraft-specific. + """ + from sqlalchemy import select + + Company, Subscription, SubscriptionPayment, async_session_maker = ( + _require_tradecraft_models() + ) + + try: + async with async_session_maker() as db: + now = datetime.now(timezone.utc) + + result = await db.execute( + select(SubscriptionPayment).where( + SubscriptionPayment.status.in_(["awaiting_payment", "underpaid"]), + SubscriptionPayment.expires_at > now, + ) + ) + payments = result.scalars().all() + + if not payments: + return + + logger.debug( + "[tradecraft-compat] Checking %d subscription payment(s)", + len(payments), + ) + + for sp in payments: + try: + utxos = await _check_address_utxos(sp.address) + expected = sp.expected_lovelace or 0 + ( + new_status_enum, + raw_lovelace, + total_value, + received_assets, + tx_hash, + ) = await _evaluate_payment(expected, utxos) + new_status = new_status_enum.value + + status_map = { + "pending": "awaiting_payment", + "confirmed": "confirmed", + "overpaid": "overpaid", + "underpaid": "underpaid", + } + mapped_status = status_map.get(new_status, new_status) + + if mapped_status == sp.status and raw_lovelace == 0: + continue + + sp.received_lovelace = raw_lovelace + sp.total_value_lovelace = total_value + sp.received_assets = received_assets + + if tx_hash: + sp.tx_hash = tx_hash + + if mapped_status != sp.status: + old_status = sp.status + sp.status = mapped_status + + if mapped_status in ("confirmed", "overpaid"): + sp.confirmed_at = now + + if sp.subscription_id: + sub_result = await db.execute( + select(Subscription).where( + Subscription.id == sp.subscription_id + ) + ) + sub = sub_result.scalar_one_or_none() + if sub: + sub.status = "active" + sub.updated_at = now + if ( + sub.pending_tier + and sp.period_end + and sp.period_end <= now.date() + ): + sub.tier = sub.pending_tier + sub.pending_tier = None + sub.pending_tier_at = None + + company_result = await db.execute( + select(Company).where(Company.id == sp.company_id) + ) + company = company_result.scalar_one_or_none() + if company: + sub_result2 = await db.execute( + select(Subscription).where( + Subscription.company_id == sp.company_id + ) + ) + sub2 = sub_result2.scalar_one_or_none() + if sub2: + company.subscription_tier = sub2.tier + company.subscription_status = "active" + + logger.info( + "[tradecraft-compat] sub_payment #%d company_id=%d: " + "%s -> %s (%.6f ADA received)", + sp.id, + sp.company_id, + old_status, + mapped_status, + raw_lovelace / 1_000_000, + ) + + except Exception as e: + logger.exception( + "[tradecraft-compat] Error checking sub_payment #%d: %s", + sp.id, + e, + ) + + await db.commit() + + except Exception: + logger.exception( + "[tradecraft-compat] check_subscription_payments job failed" + ) + + +async def reprice_subscription_payments() -> None: # pragma: no cover — TradeCraft-only + """Reprice expired subscription payments — 24h window, 3-reprice cap.""" + from sqlalchemy import select + + _, _, SubscriptionPayment, async_session_maker = _require_tradecraft_models() + + try: + async with async_session_maker() as db: + now = datetime.now(timezone.utc) + + result = await db.execute( + select(SubscriptionPayment).where( + SubscriptionPayment.status == "awaiting_payment", + SubscriptionPayment.expires_at <= now, + SubscriptionPayment.repriced_count < 3, + ) + ) + payments = result.scalars().all() + + if not payments: + return + + logger.info( + "[tradecraft-compat] Repricing %d subscription payment(s)", + len(payments), + ) + + ada_price = await get_ada_usd_price() + if ada_price <= 0: + logger.warning( + "[tradecraft-compat] Cannot reprice subscriptions — " + "ADA price unavailable" + ) + return + + new_expires_at = now + timedelta(hours=24) + + for sp in payments: + try: + total_usd = float(sp.expected_usd or 0) + if total_usd <= 0: + sp.status = "expired" + continue + + new_lovelace = await convert_usd_to_lovelace(total_usd) + if new_lovelace == 0: + continue + + old_lovelace = sp.expected_lovelace + sp.expected_lovelace = new_lovelace + sp.ada_price_usd = Decimal(str(round(ada_price, 4))) + sp.expires_at = new_expires_at + sp.repriced_count += 1 + + logger.info( + "[tradecraft-compat] Repriced sub_payment #%d: %d -> %d " + "lovelace (ADA=$%.4f, reprice #%d)", + sp.id, + old_lovelace or 0, + new_lovelace, + ada_price, + sp.repriced_count, + ) + + except Exception as e: + logger.exception( + "[tradecraft-compat] Error repricing sub_payment #%d: %s", + sp.id, + e, + ) + + expired_result = await db.execute( + select(SubscriptionPayment).where( + SubscriptionPayment.status == "awaiting_payment", + SubscriptionPayment.expires_at <= now, + SubscriptionPayment.repriced_count >= 3, + ) + ) + for sp in expired_result.scalars().all(): + sp.status = "expired" + logger.info( + "[tradecraft-compat] sub_payment #%d expired after %d repricings", + sp.id, + sp.repriced_count, + ) + + await db.commit() + + except Exception: + logger.exception( + "[tradecraft-compat] reprice_subscription_payments job failed" + ) + + +async def enforce_grace_period() -> None: # pragma: no cover — TradeCraft-only + """Daily grace-period enforcement — past_due / suspended transitions.""" + from sqlalchemy import select + + Company, Subscription, SubscriptionPayment, async_session_maker = ( + _require_tradecraft_models() + ) + + try: + async with async_session_maker() as db: + today = datetime.now(timezone.utc).date() + + overdue_result = await db.execute( + select(SubscriptionPayment).where( + SubscriptionPayment.status.in_( + ["awaiting_payment", "underpaid", "expired"] + ), + SubscriptionPayment.due_date < today, + ) + ) + overdue_payments = overdue_result.scalars().all() + + for sp in overdue_payments: + try: + sub_result = await db.execute( + select(Subscription).where( + Subscription.company_id == sp.company_id + ) + ) + sub = sub_result.scalar_one_or_none() + if not sub or sub.status in ("cancelled", "suspended"): + continue + + company_result = await db.execute( + select(Company).where(Company.id == sp.company_id) + ) + company = company_result.scalar_one_or_none() + + if sp.grace_deadline and today > sp.grace_deadline: + if sub.status != "suspended": + sub.status = "suspended" + sub.updated_at = datetime.now(timezone.utc) + if company: + company.subscription_status = "suspended" + logger.info( + "[tradecraft-compat] company_id=%d suspended " + "(grace deadline %s passed)", + sp.company_id, + sp.grace_deadline, + ) + elif sub.status == "active": + sub.status = "past_due" + sub.updated_at = datetime.now(timezone.utc) + if company: + company.subscription_status = "past_due" + logger.info( + "[tradecraft-compat] company_id=%d past_due " + "(due_date %s passed)", + sp.company_id, + sp.due_date, + ) + + except Exception as e: + logger.exception( + "[tradecraft-compat] Error enforcing grace period " + "for sub_payment #%d: %s", + sp.id, + e, + ) + + await db.commit() + + except Exception: + logger.exception( + "[tradecraft-compat] enforce_grace_period job failed" + ) + + +# --------------------------------------------------------------------------- +# Standalone TradeCraft scheduler — registers the subscription jobs only. +# --------------------------------------------------------------------------- + + +_tc_scheduler: Optional[AsyncIOScheduler] = None + + +async def start_tradecraft_scheduler() -> None: # pragma: no cover — TradeCraft-only + """Start ONLY the TradeCraft-specific subscription + grace-period jobs. + + The generic invoice jobs should be run via :class:`InvoiceScheduler` + against a ``SQLAlchemyInvoiceStore`` adapter (not shipped here — + TradeCraft is responsible for implementing it during the migration). + """ + global _tc_scheduler + + if _tc_scheduler and _tc_scheduler.running: + return + + _tc_scheduler = AsyncIOScheduler() + + _tc_scheduler.add_job( + check_subscription_payments, + trigger=IntervalTrigger(seconds=60), + id="tradecraft_check_sub_payments", + name="TradeCraft: Check Subscription Payments", + replace_existing=True, + max_instances=1, + coalesce=True, + ) + + _tc_scheduler.add_job( + reprice_subscription_payments, + trigger=IntervalTrigger(hours=6), + id="tradecraft_reprice_sub_payments", + name="TradeCraft: Reprice Subscription Payments", + replace_existing=True, + max_instances=1, + coalesce=True, + ) + + _tc_scheduler.add_job( + enforce_grace_period, + trigger=CronTrigger(hour=6, minute=0, timezone="UTC"), + id="tradecraft_enforce_grace", + name="TradeCraft: Enforce Subscription Grace Periods", + replace_existing=True, + max_instances=1, + coalesce=True, + ) + + _tc_scheduler.start() + logger.info( + "[tradecraft-compat] Started — subscription + grace-period jobs only" + ) + + +async def stop_tradecraft_scheduler() -> None: # pragma: no cover — TradeCraft-only + """Stop the TradeCraft-specific scheduler.""" + global _tc_scheduler + if _tc_scheduler: + _tc_scheduler.shutdown(wait=False) + _tc_scheduler = None diff --git a/pyproject.toml b/pyproject.toml index c487be8..731b696 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "cardano-checkout" -version = "0.1.0.dev0" +version = "0.2.0.dev0" description = "Merchant-side Cardano payments SDK + NFT cert-of-authenticity minting (zero-custody)" readme = "README.md" requires-python = ">=3.10"