v0.2: refactor monitor + scheduler around InvoiceStore Protocol

Rewrite monitor.py so it operates entirely through the
cardano_checkout.store.InvoiceStore Protocol — no more SQLAlchemy
imports, no more CardanoPayment / PlatformConfig coupling. Same
behavioural shape: same Koios URL, same 15s check cadence, same 2%
confirm / overpay tolerances, same 3-reprice cap.

Rewrite scheduler.py as a reusable InvoiceScheduler dataclass that
wires two APScheduler jobs (check_pending every 15s, reprice_expired
every 60s) against a consumer-supplied store. The subscription +
grace-period jobs are TradeCraft-specific and get lifted into
tradecraft_compat.py verbatim so TradeCraft can still import them
during the migration window without any code change.

Add InMemoryStore reference implementation to store.py — used by the
test suite and handy for local dev / ephemeral workflows.

Bump version to 0.2.0-dev.
This commit is contained in:
Kayos 2026-04-23 19:55:28 -07:00
parent eef22dc5cd
commit a6d4ac8521
6 changed files with 952 additions and 596 deletions

View file

@ -21,17 +21,30 @@ For NFT minting see :mod:`cardano_checkout.mint`.
from __future__ import annotations
__version__ = "0.1.0-dev"
__version__ = "0.2.0-dev"
# Pure modules — stable API from extraction
from cardano_checkout import addresses, oracles # noqa: F401
# Payment lifecycle
from cardano_checkout.invoice import Invoice, InvoiceStatus # noqa: F401
from cardano_checkout.store import InvoiceStore # noqa: F401
from cardano_checkout.store import InMemoryStore, InvoiceStore # noqa: F401
# Monitoring + scheduling
from cardano_checkout.monitor import ( # noqa: F401
check_pending_invoices,
reprice_expired_invoices,
)
from cardano_checkout.scheduler import InvoiceScheduler # noqa: F401
# NFT + IPFS
from cardano_checkout.mint import MintPolicy, mint_nft_cert # noqa: F401
from cardano_checkout.mint import ( # noqa: F401
MintPolicy,
UnsignedMint,
build_cip25_metadata,
mint_nft_cert,
submit_signed_tx,
)
from cardano_checkout.ipfs import IPFSClient, pin_bytes # noqa: F401
__all__ = [
@ -41,8 +54,15 @@ __all__ = [
"Invoice",
"InvoiceStatus",
"InvoiceStore",
"InMemoryStore",
"InvoiceScheduler",
"check_pending_invoices",
"reprice_expired_invoices",
"MintPolicy",
"UnsignedMint",
"mint_nft_cert",
"submit_signed_tx",
"build_cip25_metadata",
"IPFSClient",
"pin_bytes",
]

View file

@ -1,123 +1,158 @@
"""
Cardano UTXO Monitoring Service
"""Cardano UTXO monitoring — framework-agnostic polling against :class:`InvoiceStore`.
Polls Koios API to detect on-chain payments at derived Cardano addresses.
Called by the scheduler every 15 seconds for pending payments, and every
60 seconds to reprice expired payment requests.
Polls Koios for on-chain payments at the receive addresses of invoices that
are still in a non-terminal state, updates invoice status through the
:class:`InvoiceStore` Protocol, and optionally reprices expired invoices
against the current ADA/USD oracle snapshot.
Called by :mod:`cardano_checkout.scheduler` every 15 seconds for pending
payments, and every 60 seconds to reprice expired ones.
Koios endpoint used::
Koios endpoint used:
POST https://api.koios.rest/api/v1/address_utxos
Body: {"_addresses": ["addr1..."]}
Status flow applied here:
pending -> confirmed (received >= expected * 0.98)
pending -> underpaid (received > 0 but < expected * 0.98)
pending -> overpaid (received >= expected * 1.02 still confirmed)
pending -> expired (handled by reprice_expired_payments)
Status transitions applied here::
PENDING CONFIRMED (received >= expected * CONFIRM_TOLERANCE)
PENDING UNDERPAID (received > 0 but below tolerance)
PENDING OVERPAID (received >= expected * OVERPAY_THRESHOLD)
PENDING EXPIRED (after reprice_count exhausts see reprice_expired_invoices)
Behavioral shape is identical to the original TradeCraft ``services/cardano_monitor.py``:
same polling intervals, same Koios URL, same 2% confirm / overpay tolerances.
The only change is that persistence is now delegated to the store Protocol
instead of being welded to SQLAlchemy + the ``CardanoPayment`` model.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Optional
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models import CardanoPayment, Config, PlatformConfig
from services.cardano_price import (
convert_token_to_lovelace,
get_ada_usd_price,
convert_usd_to_lovelace,
from cardano_checkout.invoice import Invoice, InvoiceStatus
from cardano_checkout.oracles import (
KNOWN_TOKENS,
convert_token_to_lovelace,
convert_usd_to_lovelace,
get_ada_usd_price,
)
from cardano_checkout.store import InvoiceStore
logger = logging.getLogger(__name__)
KOIOS_URL = "https://api.koios.rest/api/v1/address_utxos"
KOIOS_TIMEOUT = 15 # seconds
# Tolerance for confirming payment (2%)
# Tolerance for confirming payment (2%) — unchanged from v0.1 / TradeCraft.
CONFIRM_TOLERANCE = 0.98
OVERPAY_THRESHOLD = 1.02
# Default reprice cap + window (matches TradeCraft defaults).
DEFAULT_MAX_REPRICINGS = 3
DEFAULT_PAYMENT_WINDOW_MINUTES = 15
# =============================================================================
# Koios API
# =============================================================================
async def _check_address_utxos(address: str) -> list[dict]:
"""
Query Koios for all UTXOs at the given Cardano address.
# ---------------------------------------------------------------------------
# Koios UTXO query
# ---------------------------------------------------------------------------
Returns a list of UTXO dicts from Koios, or an empty list on error.
Each UTXO has keys: tx_hash, tx_index, value (lovelace), asset_list.
async def check_address_utxos(
address: str, koios_url: str = KOIOS_URL, timeout: float = KOIOS_TIMEOUT
) -> list[dict]:
"""Query Koios for all UTXOs at ``address``.
Returns a list of UTXO dicts (each with ``tx_hash``, ``tx_index``,
``value``, ``asset_list``) or an empty list on any error. Never raises.
"""
try:
async with httpx.AsyncClient(timeout=KOIOS_TIMEOUT) as client:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(
KOIOS_URL,
koios_url,
json={"_addresses": [address]},
headers={"Accept": "application/json"},
)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, list):
logger.warning("[cardano-monitor] Unexpected Koios response shape for %s", address[:20])
logger.warning(
"[cardano-monitor] Unexpected Koios response shape for %s",
address[:20],
)
return []
return data
except httpx.HTTPStatusError as e:
logger.error(
"[cardano-monitor] Koios HTTP %s for %s: %s",
e.response.status_code, address[:20], e.response.text[:200],
e.response.status_code,
address[:20],
e.response.text[:200],
)
return []
except httpx.TimeoutException:
logger.warning("[cardano-monitor] Koios timeout for address %s", address[:20])
return []
except Exception as e:
logger.error("[cardano-monitor] Koios unexpected error for %s: %s", address[:20], e)
except Exception as e: # pragma: no cover — defensive
logger.error(
"[cardano-monitor] Koios unexpected error for %s: %s", address[:20], e
)
return []
# =============================================================================
# Payment evaluation
# =============================================================================
# Backwards-compatible alias — monitor.py in TradeCraft imports the private name.
# Keeping a leading-underscore alias so the TradeCraft shim can still reach it.
_check_address_utxos = check_address_utxos
async def _evaluate_payment(payment: CardanoPayment, utxos: list[dict]) -> tuple[str, int, int, dict, Optional[str]]:
"""
Evaluate UTXOs against the expected payment and determine new status.
# ---------------------------------------------------------------------------
# UTXO evaluation
# ---------------------------------------------------------------------------
async def evaluate_utxos(
expected_lovelace: int, utxos: list[dict]
) -> tuple[InvoiceStatus, int, int, dict, Optional[str]]:
"""Classify a batch of UTXOs against an expected-lovelace target.
Returns:
(new_status, received_lovelace, total_value_lovelace, received_assets, tx_hash)
Tuple of ``(status, raw_lovelace, total_value_lovelace, received_assets, latest_tx_hash)``.
Status rules:
- No UTXOs -> "pending" (no change)
- total_value >= expected * OVERPAY_THRESHOLD -> "overpaid" (treated as confirmed)
- total_value >= expected * CONFIRM_TOLERANCE -> "confirmed"
- total_value > 0 but below tolerance -> "underpaid"
- ``status`` :class:`InvoiceStatus` the invoice should transition into.
``PENDING`` means "no change, keep polling".
- ``raw_lovelace`` pure ADA received (excluding native-asset value).
- ``total_value_lovelace`` ADA + ADA-equivalent of native assets via DexHunter.
- ``received_assets`` ``{policy_id.asset_name_hex: quantity}``.
- ``latest_tx_hash`` most recent observed tx hash, or None if no UTXOs.
Status rules mirror TradeCraft exactly:
- No UTXOs ``PENDING`` (no change)
- ``total_value >= expected * OVERPAY_THRESHOLD`` ``OVERPAID`` (treated as confirmed)
- ``total_value >= expected * CONFIRM_TOLERANCE`` ``CONFIRMED``
- ``total_value > 0`` but below tolerance ``UNDERPAID``
"""
if not utxos:
return "pending", 0, 0, {}, None
return InvoiceStatus.PENDING, 0, 0, {}, None
raw_lovelace = 0
received_assets: dict[str, int] = {}
latest_tx_hash: Optional[str] = None
for utxo in utxos:
# Sum ADA (lovelace)
try:
raw_lovelace += int(utxo.get("value", 0))
except (ValueError, TypeError):
pass
# Track latest tx_hash
tx = utxo.get("tx_hash")
if tx:
latest_tx_hash = tx
# Collect native assets
for asset in utxo.get("asset_list", []) or []:
policy_id = asset.get("policy_id", "")
asset_name = asset.get("asset_name", "")
@ -129,14 +164,13 @@ async def _evaluate_payment(payment: CardanoPayment, utxos: list[dict]) -> tuple
if qty > 0:
received_assets[asset_id] = received_assets.get(asset_id, 0) + qty
# Convert native assets to lovelace equivalent
# Convert native assets to lovelace equivalent via DexHunter.
asset_lovelace = 0
for asset_id, qty in received_assets.items():
if "." not in asset_id:
continue
policy_id, asset_name_hex = asset_id.split(".", 1)
# Find matching known token for decimals
decimals = 0
for token_info in KNOWN_TOKENS.values():
if token_info.get("policy_id") == policy_id:
@ -144,174 +178,244 @@ async def _evaluate_payment(payment: CardanoPayment, utxos: list[dict]) -> tuple
break
try:
lv = await convert_token_to_lovelace(policy_id, asset_name_hex, qty, decimals)
lv = await convert_token_to_lovelace(
policy_id, asset_name_hex, qty, decimals
)
if lv is not None:
asset_lovelace += lv
except Exception as e:
logger.warning("[cardano-monitor] Failed to convert asset %s to lovelace: %s", asset_id[:20], e)
logger.warning(
"[cardano-monitor] Failed to convert asset %s to lovelace: %s",
asset_id[:20],
e,
)
total_value = raw_lovelace + asset_lovelace
expected = payment.expected_lovelace or 0
if expected == 0:
# Degenerate case — treat any payment as confirmed
new_status = "confirmed"
elif total_value >= expected * OVERPAY_THRESHOLD:
new_status = "overpaid"
elif total_value >= expected * CONFIRM_TOLERANCE:
new_status = "confirmed"
if expected_lovelace == 0:
# Degenerate case — any payment at all counts.
new_status = InvoiceStatus.CONFIRMED if total_value > 0 else InvoiceStatus.PENDING
elif total_value >= expected_lovelace * OVERPAY_THRESHOLD:
new_status = InvoiceStatus.OVERPAID
elif total_value >= expected_lovelace * CONFIRM_TOLERANCE:
new_status = InvoiceStatus.CONFIRMED
elif total_value > 0:
new_status = "underpaid"
new_status = InvoiceStatus.UNDERPAID
else:
new_status = "pending"
new_status = InvoiceStatus.PENDING
return new_status, raw_lovelace, total_value, received_assets, latest_tx_hash
# =============================================================================
# Main monitoring functions (called by scheduler)
# =============================================================================
# Backwards-compatible alias.
_evaluate_utxos = evaluate_utxos
async def check_pending_payments(db: AsyncSession) -> None:
"""
Check all pending payments that haven't expired yet.
Queries Koios for UTXOs at each address. Updates payment status in place.
# ---------------------------------------------------------------------------
# Main monitoring entrypoints (scheduled jobs call these)
# ---------------------------------------------------------------------------
async def check_pending_invoices(
store: InvoiceStore,
koios_url: str = KOIOS_URL,
limit: int = 100,
) -> int:
"""Check every :class:`InvoiceStatus.PENDING` invoice for on-chain payment.
For each pending invoice whose expiry has not yet passed, query Koios
for the UTXOs at its receive address, evaluate them against
``expected_lovelace``, and transition the invoice state accordingly.
Args:
store: Persistence backend.
koios_url: Koios base URL. Override for testnet / custom gateways.
limit: Max pending invoices to process per poll.
Returns:
Number of invoices updated this cycle (for logging / metrics).
"""
now = datetime.now(timezone.utc)
pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit)
if not pending:
return 0
result = await db.execute(
select(CardanoPayment).where(
CardanoPayment.status == "pending",
CardanoPayment.expires_at > now,
)
)
payments = result.scalars().all()
# Filter out already-expired invoices — those get picked up by reprice_expired_invoices.
active = [
inv for inv in pending if inv.expires_at is None or inv.expires_at > now
]
if not active:
return 0
if not payments:
return
logger.debug("[cardano-monitor] Checking %d pending invoice(s)", len(active))
logger.debug("[cardano-monitor] Checking %d pending payment(s)", len(payments))
for payment in payments:
updates = 0
for invoice in active:
try:
utxos = await _check_address_utxos(payment.address)
new_status, raw_lovelace, total_value, received_assets, tx_hash = await _evaluate_payment(payment, utxos)
utxos = await check_address_utxos(invoice.receive_address, koios_url=koios_url)
(
new_status,
raw_lovelace,
total_value,
_received_assets,
tx_hash,
) = await evaluate_utxos(invoice.expected_lovelace, utxos)
if new_status == payment.status and raw_lovelace == 0:
# No change, no UTXOs — skip DB write
if new_status == invoice.status and raw_lovelace == 0:
# No change and no UTXOs — nothing to persist.
continue
payment.received_lovelace = raw_lovelace
payment.total_value_lovelace = total_value
payment.received_assets = received_assets
invoice.received_lovelace = raw_lovelace
if tx_hash and tx_hash not in invoice.tx_hashes:
invoice.tx_hashes.append(tx_hash)
# record_tx is idempotent per contract — call it so the store
# can persist the per-utxo history however it wants.
try:
await store.record_tx(
invoice.id, tx_hash, lovelace_delta=raw_lovelace
)
except Exception as e:
logger.warning(
"[cardano-monitor] record_tx failed for %s/%s: %s",
invoice.id,
tx_hash[:12],
e,
)
if tx_hash:
payment.tx_hash = tx_hash
if new_status != invoice.status:
old_status = invoice.status
invoice.status = new_status
if new_status != payment.status:
old_status = payment.status
payment.status = new_status
if new_status in ("confirmed", "overpaid"):
payment.confirmed_at = now
if new_status in (InvoiceStatus.CONFIRMED, InvoiceStatus.OVERPAID):
invoice.confirmed_at = now
logger.info(
"[cardano-monitor] payment #%d invoice_id=%d: %s -> %s (%.6f ADA received, %.6f ADA total value)",
payment.id,
payment.invoice_id or 0,
old_status,
new_status,
"[cardano-monitor] invoice=%s merchant=%s: %s -> %s "
"(%.6f ADA received, %.6f ADA total value)",
invoice.id,
invoice.merchant_id,
old_status.value,
new_status.value,
raw_lovelace / 1_000_000,
total_value / 1_000_000,
)
await store.update(invoice)
updates += 1
except Exception as e:
logger.exception(
"[cardano-monitor] Error checking payment #%d: %s", payment.id, e
"[cardano-monitor] Error checking invoice %s: %s", invoice.id, e
)
await db.commit()
return updates
async def reprice_expired_payments(db: AsyncSession) -> None:
"""
Reprice payments whose window has expired.
async def reprice_expired_invoices(
store: InvoiceStore,
window_minutes: int = DEFAULT_PAYMENT_WINDOW_MINUTES,
max_repricings: int = DEFAULT_MAX_REPRICINGS,
limit: int = 100,
) -> int:
"""Reprice PENDING invoices whose expiry has passed.
Fetches the current ADA price, recalculates expected_lovelace, resets
expires_at to now + payment_window_minutes, and increments repriced_count.
Gives up after 3 repricings to avoid infinite loops.
Pulls the current ADA/USD oracle price, recalculates ``expected_lovelace``
from the invoice's ``usd_amount``, resets ``expires_at`` to
``now + window_minutes``, and tracks reprice count in ``invoice.metadata``
under the key ``repriced_count``. After ``max_repricings`` the invoice
is transitioned to :class:`InvoiceStatus.EXPIRED`.
Args:
store: Persistence backend.
window_minutes: New expiry window per reprice. Matches TradeCraft's
platform-config-driven value of 15 minutes by default.
max_repricings: Give-up threshold. TradeCraft default is 3.
limit: Max pending invoices to process per call.
Returns:
Number of invoices updated (repriced or expired) this cycle.
"""
now = datetime.now(timezone.utc)
pending = await store.list_by_status(InvoiceStatus.PENDING, limit=limit)
if not pending:
return 0
result = await db.execute(
select(CardanoPayment).where(
CardanoPayment.status == "pending",
CardanoPayment.expires_at <= now,
CardanoPayment.repriced_count < 3,
)
expired_candidates = [
inv for inv in pending if inv.expires_at is not None and inv.expires_at <= now
]
if not expired_candidates:
return 0
logger.info(
"[cardano-monitor] Repricing %d expired invoice(s)", len(expired_candidates)
)
payments = result.scalars().all()
if not payments:
return
logger.info("[cardano-monitor] Repricing %d expired payment(s)", len(payments))
ada_price = await get_ada_usd_price()
if ada_price <= 0:
logger.warning("[cardano-monitor] Cannot reprice — ADA price unavailable")
return
# Read platform payment window
pc_result = await db.execute(
select(PlatformConfig).where(PlatformConfig.key == "cardano_payment_window_minutes")
)
pc = pc_result.scalar_one_or_none()
try:
window_minutes = int(pc.value) if pc and pc.value else 15
except (ValueError, TypeError):
window_minutes = 15
logger.warning(
"[cardano-monitor] Cannot reprice — ADA price unavailable"
)
return 0
new_expires_at = now + timedelta(minutes=window_minutes)
updated = 0
for payment in payments:
for invoice in expired_candidates:
try:
total_usd = float(payment.expected_usd or 0)
if total_usd <= 0:
payment.status = "expired"
logger.warning("[cardano-monitor] payment #%d has no expected_usd — marking expired", payment.id)
repriced_count = int(invoice.metadata.get("repriced_count", 0))
if repriced_count >= max_repricings:
invoice.status = InvoiceStatus.EXPIRED
await store.update(invoice)
updated += 1
logger.info(
"[cardano-monitor] invoice %s expired after %d repricings",
invoice.id,
repriced_count,
)
continue
new_lovelace = await convert_usd_to_lovelace(total_usd)
usd_amount = float(invoice.usd_amount or 0)
if usd_amount <= 0:
invoice.status = InvoiceStatus.EXPIRED
await store.update(invoice)
updated += 1
logger.warning(
"[cardano-monitor] invoice %s has no usd_amount — marking expired",
invoice.id,
)
continue
new_lovelace = await convert_usd_to_lovelace(usd_amount)
if new_lovelace == 0:
logger.warning("[cardano-monitor] payment #%d: lovelace conversion returned 0, skipping", payment.id)
logger.warning(
"[cardano-monitor] invoice %s: lovelace conversion returned 0, skipping",
invoice.id,
)
continue
old_lovelace = payment.expected_lovelace
payment.expected_lovelace = new_lovelace
payment.ada_price_usd = Decimal(str(round(ada_price, 4)))
payment.expires_at = new_expires_at
payment.repriced_count += 1
old_lovelace = invoice.expected_lovelace
invoice.expected_lovelace = new_lovelace
invoice.expires_at = new_expires_at
invoice.metadata["repriced_count"] = repriced_count + 1
invoice.metadata["ada_price_usd"] = round(ada_price, 4)
await store.update(invoice)
updated += 1
logger.info(
"[cardano-monitor] Repriced payment #%d: %d -> %d lovelace (ADA=$%.4f, reprice #%d)",
payment.id, old_lovelace or 0, new_lovelace, ada_price, payment.repriced_count,
"[cardano-monitor] Repriced invoice %s: %d -> %d lovelace "
"(ADA=$%.4f, reprice #%d)",
invoice.id,
old_lovelace or 0,
new_lovelace,
ada_price,
repriced_count + 1,
)
except Exception as e:
logger.exception("[cardano-monitor] Error repricing payment #%d: %s", payment.id, e)
logger.exception(
"[cardano-monitor] Error repricing invoice %s: %s", invoice.id, e
)
# Mark payments that have exceeded max repricings as expired
expired_result = await db.execute(
select(CardanoPayment).where(
CardanoPayment.status == "pending",
CardanoPayment.expires_at <= now,
CardanoPayment.repriced_count >= 3,
)
)
for payment in expired_result.scalars().all():
payment.status = "expired"
logger.info("[cardano-monitor] payment #%d marked expired after %d repricings", payment.id, payment.repriced_count)
await db.commit()
return updated

View file

@ -1,465 +1,170 @@
"""APScheduler integration for the Cardano payment monitoring loop.
The scheduler drives two jobs against a consumer-supplied
:class:`~cardano_checkout.store.InvoiceStore`:
- :func:`cardano_checkout.monitor.check_pending_invoices` every 15 seconds
- :func:`cardano_checkout.monitor.reprice_expired_invoices` every 60 seconds
That's the *full* SDK job surface. The subscription-level + grace-period
jobs that the original TradeCraft scheduler shipped are TradeCraft-specific
(they touch ``Company``, ``Subscription``, ``SubscriptionPayment`` models
that are merchant-specific) those live in
:mod:`cardano_checkout.tradecraft_compat` so TradeCraft can still import the
exact wrappers it has always used, without polluting the generic SDK.
Usage::
from cardano_checkout.scheduler import InvoiceScheduler
from my_app.store import MySqlInvoiceStore
scheduler = InvoiceScheduler(store=MySqlInvoiceStore())
await scheduler.start()
# ... app runs ...
await scheduler.stop()
"""
Cardano Payment Monitoring Scheduler
APScheduler integration that runs five recurring jobs:
- check_pending_payments every 15 seconds
- reprice_expired_payments every 60 seconds
- _check_subscription_payments every 60 seconds
- _reprice_subscription_payments every 6 hours
- _enforce_grace_period daily at 06:00 UTC
from __future__ import annotations
Usage in main.py lifespan:
from services.cardano_scheduler import start_cardano_scheduler, stop_cardano_scheduler
@asynccontextmanager
async def lifespan(app: FastAPI):
await start_cardano_scheduler()
yield
await stop_cardano_scheduler()
"""
import logging
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from dataclasses import dataclass, field
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
from database import async_session_maker
from services.cardano_monitor import (
_check_address_utxos,
_evaluate_payment,
check_pending_payments,
reprice_expired_payments,
from cardano_checkout.monitor import (
DEFAULT_MAX_REPRICINGS,
DEFAULT_PAYMENT_WINDOW_MINUTES,
KOIOS_URL,
check_pending_invoices,
reprice_expired_invoices,
)
from services.cardano_price import convert_usd_to_lovelace, get_ada_usd_price
from cardano_checkout.store import InvoiceStore
logger = logging.getLogger(__name__)
_scheduler: Optional[AsyncIOScheduler] = None
@dataclass
class InvoiceScheduler:
"""APScheduler harness around the monitor loop.
# =============================================================================
# Scheduled job wrappers — invoice payments
# =============================================================================
async def _job_check_pending() -> None:
"""Scheduler wrapper for check_pending_payments."""
try:
async with async_session_maker() as db:
await check_pending_payments(db)
except Exception:
logger.exception("[cardano-scheduler] check_pending_payments job failed")
async def _job_reprice_expired() -> None:
"""Scheduler wrapper for reprice_expired_payments."""
try:
async with async_session_maker() as db:
await reprice_expired_payments(db)
except Exception:
logger.exception("[cardano-scheduler] reprice_expired_payments job failed")
# =============================================================================
# Scheduled job wrappers — subscription payments
# =============================================================================
async def _job_check_subscription_payments() -> None:
Attributes:
store: Persistence backend. Required.
koios_url: Chain-query endpoint. Override for testnet / custom gateways.
check_interval_seconds: How often to poll Koios for pending invoices.
Defaults to 15 identical to TradeCraft's production cadence.
reprice_interval_seconds: How often to sweep for expired invoices.
Defaults to 60.
payment_window_minutes: Re-expiry window when repricing.
max_repricings: How many times an invoice can reprice before giving up.
limit: Max invoices examined per poll cycle.
job_id_prefix: Scheduler job-id namespace. Override if running multiple
``InvoiceScheduler`` instances in one process.
"""
Poll Koios for UTXOs at awaiting_payment subscription addresses.
Reuses _check_address_utxos and _evaluate_payment from cardano_monitor.
On confirmation, advances subscription status to 'active' and updates
company.subscription_tier to match the subscription's tier.
"""
from models import Company, Subscription, SubscriptionPayment
store: InvoiceStore
koios_url: str = KOIOS_URL
check_interval_seconds: int = 15
reprice_interval_seconds: int = 60
payment_window_minutes: int = DEFAULT_PAYMENT_WINDOW_MINUTES
max_repricings: int = DEFAULT_MAX_REPRICINGS
limit: int = 100
job_id_prefix: str = "cardano_checkout"
_scheduler: Optional[AsyncIOScheduler] = field(default=None, init=False, repr=False)
try:
async with async_session_maker() as db:
now = datetime.now(timezone.utc)
result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status.in_(["awaiting_payment", "underpaid"]),
SubscriptionPayment.expires_at > now,
)
async def _job_check_pending(self) -> None:
try:
await check_pending_invoices(
self.store, koios_url=self.koios_url, limit=self.limit
)
payments = result.scalars().all()
if not payments:
return
logger.debug(
"[cardano-scheduler] Checking %d subscription payment(s)", len(payments)
except Exception:
logger.exception(
"[cardano-scheduler] check_pending_invoices job failed"
)
for sp in payments:
try:
utxos = await _check_address_utxos(sp.address)
# _evaluate_payment expects a CardanoPayment-like object.
# SubscriptionPayment has the same fields it needs.
new_status, raw_lovelace, total_value, received_assets, tx_hash = (
await _evaluate_payment(sp, utxos)
)
# Map monitor statuses to subscription payment statuses
status_map = {
"pending": "awaiting_payment",
"confirmed": "confirmed",
"overpaid": "overpaid",
"underpaid": "underpaid",
}
mapped_status = status_map.get(new_status, new_status)
if mapped_status == sp.status and raw_lovelace == 0:
continue
sp.received_lovelace = raw_lovelace
sp.total_value_lovelace = total_value
sp.received_assets = received_assets
if tx_hash:
sp.tx_hash = tx_hash
if mapped_status != sp.status:
old_status = sp.status
sp.status = mapped_status
if mapped_status in ("confirmed", "overpaid"):
sp.confirmed_at = now
# Advance subscription + company tier
if sp.subscription_id:
sub_result = await db.execute(
select(Subscription).where(
Subscription.id == sp.subscription_id
)
)
sub = sub_result.scalar_one_or_none()
if sub:
sub.status = "active"
sub.updated_at = now
# Apply any pending tier downgrade
if sub.pending_tier and sp.period_end and sp.period_end <= now.date():
sub.tier = sub.pending_tier
sub.pending_tier = None
sub.pending_tier_at = None
company_result = await db.execute(
select(Company).where(Company.id == sp.company_id)
)
company = company_result.scalar_one_or_none()
if company:
# Get current sub tier
sub_result2 = await db.execute(
select(Subscription).where(
Subscription.company_id == sp.company_id
)
)
sub2 = sub_result2.scalar_one_or_none()
if sub2:
company.subscription_tier = sub2.tier
company.subscription_status = "active"
logger.info(
"[cardano-scheduler] sub_payment #%d company_id=%d: %s -> %s"
" (%.6f ADA received)",
sp.id,
sp.company_id,
old_status,
mapped_status,
raw_lovelace / 1_000_000,
)
except Exception as e:
logger.exception(
"[cardano-scheduler] Error checking sub_payment #%d: %s", sp.id, e
)
await db.commit()
except Exception:
logger.exception("[cardano-scheduler] _check_subscription_payments job failed")
async def _job_reprice_subscription_payments() -> None:
"""
Reprice awaiting_payment subscription records whose 24-hour window has expired.
Fetches the current ADA price, recalculates expected_lovelace, resets expires_at
to now + 24 hours, and increments repriced_count. Gives up after 3 repricings.
"""
from models import SubscriptionPayment
try:
async with async_session_maker() as db:
now = datetime.now(timezone.utc)
result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status == "awaiting_payment",
SubscriptionPayment.expires_at <= now,
SubscriptionPayment.repriced_count < 3,
)
async def _job_reprice_expired(self) -> None:
try:
await reprice_expired_invoices(
self.store,
window_minutes=self.payment_window_minutes,
max_repricings=self.max_repricings,
limit=self.limit,
)
payments = result.scalars().all()
if not payments:
return
logger.info(
"[cardano-scheduler] Repricing %d subscription payment(s)", len(payments)
except Exception:
logger.exception(
"[cardano-scheduler] reprice_expired_invoices job failed"
)
ada_price = await get_ada_usd_price()
if ada_price <= 0:
logger.warning(
"[cardano-scheduler] Cannot reprice subscriptions — ADA price unavailable"
)
return
async def start(self) -> None:
"""Start the scheduler. Safe to call repeatedly."""
if self._scheduler and self._scheduler.running:
logger.debug("[cardano-scheduler] Already running — skipping start")
return
new_expires_at = now + timedelta(hours=24)
self._scheduler = AsyncIOScheduler()
for sp in payments:
try:
total_usd = float(sp.expected_usd or 0)
if total_usd <= 0:
sp.status = "expired"
logger.warning(
"[cardano-scheduler] sub_payment #%d has no expected_usd — expired",
sp.id,
)
continue
self._scheduler.add_job(
self._job_check_pending,
trigger=IntervalTrigger(seconds=self.check_interval_seconds),
id=f"{self.job_id_prefix}_check_pending",
name="Cardano: Check Pending Invoices",
replace_existing=True,
max_instances=1,
coalesce=True,
)
new_lovelace = await convert_usd_to_lovelace(total_usd)
if new_lovelace == 0:
logger.warning(
"[cardano-scheduler] sub_payment #%d: lovelace conversion returned 0, skipping",
sp.id,
)
continue
self._scheduler.add_job(
self._job_reprice_expired,
trigger=IntervalTrigger(seconds=self.reprice_interval_seconds),
id=f"{self.job_id_prefix}_reprice_expired",
name="Cardano: Reprice Expired Invoices",
replace_existing=True,
max_instances=1,
coalesce=True,
)
old_lovelace = sp.expected_lovelace
sp.expected_lovelace = new_lovelace
sp.ada_price_usd = Decimal(str(round(ada_price, 4)))
sp.expires_at = new_expires_at
sp.repriced_count += 1
self._scheduler.start()
logger.info(
"[cardano-scheduler] Repriced sub_payment #%d: %d -> %d lovelace"
" (ADA=$%.4f, reprice #%d)",
sp.id,
old_lovelace or 0,
new_lovelace,
ada_price,
sp.repriced_count,
)
logger.info(
"[cardano-scheduler] Started — check_pending every %ds, reprice_expired every %ds",
self.check_interval_seconds,
self.reprice_interval_seconds,
)
except Exception as e:
logger.exception(
"[cardano-scheduler] Error repricing sub_payment #%d: %s", sp.id, e
)
# Mark max-repriced payments as expired
expired_result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status == "awaiting_payment",
SubscriptionPayment.expires_at <= now,
SubscriptionPayment.repriced_count >= 3,
)
)
for sp in expired_result.scalars().all():
sp.status = "expired"
logger.info(
"[cardano-scheduler] sub_payment #%d expired after %d repricings",
sp.id,
sp.repriced_count,
)
await db.commit()
except Exception:
logger.exception("[cardano-scheduler] _reprice_subscription_payments job failed")
async def stop(self) -> None:
"""Stop the scheduler. Idempotent."""
if self._scheduler:
self._scheduler.shutdown(wait=False)
self._scheduler = None
logger.info("[cardano-scheduler] Stopped")
async def _job_enforce_grace_period() -> None:
"""
Daily enforcement of subscription grace periods (runs at 06:00 UTC).
# ---------------------------------------------------------------------------
# Backwards-compatible free-function API
# ---------------------------------------------------------------------------
#
# Early adopters may have imported ``start_cardano_scheduler`` / ``stop_cardano_scheduler``
# directly. Provide those as thin wrappers around a module-level default instance.
# Using the InvoiceScheduler class is preferred for anything nontrivial.
Rules:
- If due_date has passed and payment is not confirmed: mark subscription past_due
- If grace_deadline has passed and payment still not confirmed: suspend subscription
and update company.subscription_status accordingly
"""
from models import Company, Subscription, SubscriptionPayment
try:
async with async_session_maker() as db:
today = datetime.now(timezone.utc).date()
# Find subscriptions that are active/past_due and have an overdue payment
overdue_result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status.in_(["awaiting_payment", "underpaid", "expired"]),
SubscriptionPayment.due_date < today,
)
)
overdue_payments = overdue_result.scalars().all()
for sp in overdue_payments:
try:
sub_result = await db.execute(
select(Subscription).where(
Subscription.company_id == sp.company_id
)
)
sub = sub_result.scalar_one_or_none()
if not sub or sub.status in ("cancelled", "suspended"):
continue
company_result = await db.execute(
select(Company).where(Company.id == sp.company_id)
)
company = company_result.scalar_one_or_none()
# Grace deadline passed → suspend
if sp.grace_deadline and today > sp.grace_deadline:
if sub.status != "suspended":
sub.status = "suspended"
sub.updated_at = datetime.now(timezone.utc)
if company:
company.subscription_status = "suspended"
logger.info(
"[cardano-scheduler] company_id=%d subscription suspended"
" (grace deadline %s passed)",
sp.company_id,
sp.grace_deadline,
)
# Due date passed but within grace → mark past_due
elif sub.status == "active":
sub.status = "past_due"
sub.updated_at = datetime.now(timezone.utc)
if company:
company.subscription_status = "past_due"
logger.info(
"[cardano-scheduler] company_id=%d subscription past_due"
" (due_date %s passed)",
sp.company_id,
sp.due_date,
)
except Exception as e:
logger.exception(
"[cardano-scheduler] Error enforcing grace period for sub_payment #%d: %s",
sp.id,
e,
)
await db.commit()
except Exception:
logger.exception("[cardano-scheduler] _enforce_grace_period job failed")
_default: Optional[InvoiceScheduler] = None
# =============================================================================
# Lifecycle
# =============================================================================
async def start_cardano_scheduler() -> None:
"""
Start the Cardano payment monitoring scheduler.
Registers five jobs:
- check_pending_payments: every 15 seconds
- reprice_expired_payments: every 60 seconds
- check_subscription_payments: every 60 seconds
- reprice_subscription_payments: every 6 hours
- enforce_grace_period: daily at 06:00 UTC
Safe to call multiple times skips if already running.
"""
global _scheduler
if _scheduler and _scheduler.running:
logger.debug("[cardano-scheduler] Already running — skipping start")
return
_scheduler = AsyncIOScheduler()
_scheduler.add_job(
_job_check_pending,
trigger=IntervalTrigger(seconds=15),
id="cardano_check_pending",
name="Cardano: Check Pending Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.add_job(
_job_reprice_expired,
trigger=IntervalTrigger(seconds=60),
id="cardano_reprice_expired",
name="Cardano: Reprice Expired Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.add_job(
_job_check_subscription_payments,
trigger=IntervalTrigger(seconds=60),
id="cardano_check_sub_payments",
name="Cardano: Check Subscription Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.add_job(
_job_reprice_subscription_payments,
trigger=IntervalTrigger(hours=6),
id="cardano_reprice_sub_payments",
name="Cardano: Reprice Subscription Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.add_job(
_job_enforce_grace_period,
trigger=CronTrigger(hour=6, minute=0, timezone="UTC"),
id="cardano_enforce_grace",
name="Cardano: Enforce Subscription Grace Periods",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.start()
logger.info(
"[cardano-scheduler] Started — check_pending every 15s, reprice_expired every 60s,"
" check_sub_payments every 60s, reprice_sub_payments every 6h,"
" enforce_grace_period daily at 06:00 UTC"
)
async def start_cardano_scheduler(
store: InvoiceStore, **kwargs
) -> InvoiceScheduler: # pragma: no cover — convenience shim
"""Start the default :class:`InvoiceScheduler` singleton."""
global _default
if _default is None:
_default = InvoiceScheduler(store=store, **kwargs)
await _default.start()
return _default
async def stop_cardano_scheduler() -> None:
"""
Gracefully stop the Cardano scheduler.
Call this in the FastAPI lifespan shutdown block.
"""
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info("[cardano-scheduler] Stopped")
async def stop_cardano_scheduler() -> None: # pragma: no cover — convenience shim
"""Stop the default :class:`InvoiceScheduler` singleton."""
global _default
if _default is not None:
await _default.stop()
_default = None

View file

@ -8,10 +8,17 @@ The SDK does not prescribe a database. Consumers implement
All methods are async so the same Protocol works cleanly for both
asyncpg/asyncio-sqlalchemy backends and synchronous backends wrapped
with ``asyncio.to_thread``.
This module also ships :class:`InMemoryStore` a reference implementation
used by the test suite and useful as a drop-in for local development or
ephemeral workflows that don't need durability.
"""
from __future__ import annotations
import asyncio
import copy
from dataclasses import dataclass, field
from typing import Optional, Protocol, runtime_checkable
from cardano_checkout.invoice import Invoice, InvoiceStatus
@ -67,3 +74,83 @@ class InvoiceStore(Protocol):
state.
"""
...
# ---------------------------------------------------------------------------
# Reference implementation: InMemoryStore
# ---------------------------------------------------------------------------
@dataclass
class InMemoryStore:
"""In-memory :class:`InvoiceStore` — intended for tests and local dev.
Uses an ``asyncio.Lock`` around mutating operations so concurrent callers
see a consistent view. Objects are deep-copied on read so callers can't
mutate the stored state by accident.
"""
_invoices: dict[str, Invoice] = field(default_factory=dict)
_tx_log: dict[tuple[str, str], int] = field(default_factory=dict)
_index_counters: dict[str, int] = field(default_factory=dict)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
async def create(self, invoice: Invoice) -> None:
async with self._lock:
if invoice.id in self._invoices:
raise ValueError(f"Invoice {invoice.id!r} already exists")
self._invoices[invoice.id] = copy.deepcopy(invoice)
# Bump the per-merchant index cursor so next_derivation_index doesn't
# hand out the same slot again.
cur = self._index_counters.get(invoice.merchant_id, -1)
if invoice.derivation_index > cur:
self._index_counters[invoice.merchant_id] = invoice.derivation_index
async def get(self, invoice_id: str) -> Optional[Invoice]:
async with self._lock:
stored = self._invoices.get(invoice_id)
return copy.deepcopy(stored) if stored else None
async def list_by_status(
self, status: InvoiceStatus, limit: int = 100
) -> list[Invoice]:
async with self._lock:
matching = [
copy.deepcopy(inv)
for inv in self._invoices.values()
if inv.status == status
]
# Newest first per contract.
matching.sort(key=lambda inv: inv.created_at, reverse=True)
return matching[:limit]
async def update(self, invoice: Invoice) -> None:
async with self._lock:
if invoice.id not in self._invoices:
raise KeyError(f"Invoice {invoice.id!r} not found")
self._invoices[invoice.id] = copy.deepcopy(invoice)
async def next_derivation_index(self, merchant_id: str) -> int:
async with self._lock:
nxt = self._index_counters.get(merchant_id, -1) + 1
self._index_counters[merchant_id] = nxt
return nxt
async def record_tx(
self, invoice_id: str, tx_hash: str, lovelace_delta: int
) -> None:
async with self._lock:
# Idempotent: (invoice_id, tx_hash) overwrites the delta rather than
# adding it a second time — see InvoiceStore.record_tx contract.
self._tx_log[(invoice_id, tx_hash)] = lovelace_delta
inv = self._invoices.get(invoice_id)
if inv and tx_hash not in inv.tx_hashes:
inv.tx_hashes.append(tx_hash)
# --- test helpers (not part of the Protocol) ---
def _all(self) -> list[Invoice]:
return [copy.deepcopy(inv) for inv in self._invoices.values()]
def _tx_records(self) -> dict[tuple[str, str], int]:
return dict(self._tx_log)

View file

@ -0,0 +1,440 @@
"""TradeCraft-specific compatibility shim.
TradeCraft's ``services/cardano_scheduler.py`` shipped five jobs, of which
only two ``check_pending_payments`` and ``reprice_expired_payments``
are generic invoice logic. The remaining three
(``_check_subscription_payments``, ``_reprice_subscription_payments``,
``_enforce_grace_period``) manipulate TradeCraft's ``Company``,
``Subscription``, and ``SubscriptionPayment`` SQLAlchemy models directly;
those are merchant-specific concerns that do not belong in the generic SDK.
This module preserves the original TradeCraft import surface so the
existing TradeCraft code path still works while the migration to
:class:`cardano_checkout.store.InvoiceStore` is in-flight. None of the
symbols here are meant to be used by new consumers.
**Do not depend on this module outside TradeCraft.** It is scheduled to
be removed once TradeCraft migrates fully to the Protocol-based API (see
TODO in the repo README).
All functions in here are *verbatim* lifts from the original
``services/cardano_scheduler.py`` TradeCraft's ``models`` + ``database``
modules are imported lazily so that importing this module never fails
for non-TradeCraft consumers. If TradeCraft's models are not importable
the jobs raise at call time, not at import time.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from cardano_checkout.monitor import check_address_utxos, evaluate_utxos
from cardano_checkout.oracles import convert_usd_to_lovelace, get_ada_usd_price
logger = logging.getLogger(__name__)
# Original TradeCraft free-function names — keep these stable.
_check_address_utxos = check_address_utxos
_evaluate_payment = evaluate_utxos
def _require_tradecraft_models():
"""Lazy import of the TradeCraft-specific models + session maker.
Returns ``(Company, Subscription, SubscriptionPayment, async_session_maker)``.
Raises ImportError with a clear message if TradeCraft isn't installed.
"""
try:
from database import async_session_maker # type: ignore[import-not-found]
from models import ( # type: ignore[import-not-found]
Company,
Subscription,
SubscriptionPayment,
)
except ImportError as exc: # pragma: no cover — only meaningful inside TradeCraft
raise ImportError(
"cardano_checkout.tradecraft_compat requires the TradeCraft app's "
"`models` + `database` modules to be importable. This shim is "
"only meant to be used from within TradeCraft itself."
) from exc
return Company, Subscription, SubscriptionPayment, async_session_maker
# ---------------------------------------------------------------------------
# Subscription payments job — verbatim TradeCraft logic
# ---------------------------------------------------------------------------
async def check_subscription_payments() -> None: # pragma: no cover — TradeCraft-only
"""Poll Koios for UTXOs at awaiting_payment subscription addresses.
On confirmation, advances ``Subscription.status`` to ``"active"`` and
updates ``Company.subscription_tier``. TradeCraft-specific.
"""
from sqlalchemy import select
Company, Subscription, SubscriptionPayment, async_session_maker = (
_require_tradecraft_models()
)
try:
async with async_session_maker() as db:
now = datetime.now(timezone.utc)
result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status.in_(["awaiting_payment", "underpaid"]),
SubscriptionPayment.expires_at > now,
)
)
payments = result.scalars().all()
if not payments:
return
logger.debug(
"[tradecraft-compat] Checking %d subscription payment(s)",
len(payments),
)
for sp in payments:
try:
utxos = await _check_address_utxos(sp.address)
expected = sp.expected_lovelace or 0
(
new_status_enum,
raw_lovelace,
total_value,
received_assets,
tx_hash,
) = await _evaluate_payment(expected, utxos)
new_status = new_status_enum.value
status_map = {
"pending": "awaiting_payment",
"confirmed": "confirmed",
"overpaid": "overpaid",
"underpaid": "underpaid",
}
mapped_status = status_map.get(new_status, new_status)
if mapped_status == sp.status and raw_lovelace == 0:
continue
sp.received_lovelace = raw_lovelace
sp.total_value_lovelace = total_value
sp.received_assets = received_assets
if tx_hash:
sp.tx_hash = tx_hash
if mapped_status != sp.status:
old_status = sp.status
sp.status = mapped_status
if mapped_status in ("confirmed", "overpaid"):
sp.confirmed_at = now
if sp.subscription_id:
sub_result = await db.execute(
select(Subscription).where(
Subscription.id == sp.subscription_id
)
)
sub = sub_result.scalar_one_or_none()
if sub:
sub.status = "active"
sub.updated_at = now
if (
sub.pending_tier
and sp.period_end
and sp.period_end <= now.date()
):
sub.tier = sub.pending_tier
sub.pending_tier = None
sub.pending_tier_at = None
company_result = await db.execute(
select(Company).where(Company.id == sp.company_id)
)
company = company_result.scalar_one_or_none()
if company:
sub_result2 = await db.execute(
select(Subscription).where(
Subscription.company_id == sp.company_id
)
)
sub2 = sub_result2.scalar_one_or_none()
if sub2:
company.subscription_tier = sub2.tier
company.subscription_status = "active"
logger.info(
"[tradecraft-compat] sub_payment #%d company_id=%d: "
"%s -> %s (%.6f ADA received)",
sp.id,
sp.company_id,
old_status,
mapped_status,
raw_lovelace / 1_000_000,
)
except Exception as e:
logger.exception(
"[tradecraft-compat] Error checking sub_payment #%d: %s",
sp.id,
e,
)
await db.commit()
except Exception:
logger.exception(
"[tradecraft-compat] check_subscription_payments job failed"
)
async def reprice_subscription_payments() -> None: # pragma: no cover — TradeCraft-only
"""Reprice expired subscription payments — 24h window, 3-reprice cap."""
from sqlalchemy import select
_, _, SubscriptionPayment, async_session_maker = _require_tradecraft_models()
try:
async with async_session_maker() as db:
now = datetime.now(timezone.utc)
result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status == "awaiting_payment",
SubscriptionPayment.expires_at <= now,
SubscriptionPayment.repriced_count < 3,
)
)
payments = result.scalars().all()
if not payments:
return
logger.info(
"[tradecraft-compat] Repricing %d subscription payment(s)",
len(payments),
)
ada_price = await get_ada_usd_price()
if ada_price <= 0:
logger.warning(
"[tradecraft-compat] Cannot reprice subscriptions — "
"ADA price unavailable"
)
return
new_expires_at = now + timedelta(hours=24)
for sp in payments:
try:
total_usd = float(sp.expected_usd or 0)
if total_usd <= 0:
sp.status = "expired"
continue
new_lovelace = await convert_usd_to_lovelace(total_usd)
if new_lovelace == 0:
continue
old_lovelace = sp.expected_lovelace
sp.expected_lovelace = new_lovelace
sp.ada_price_usd = Decimal(str(round(ada_price, 4)))
sp.expires_at = new_expires_at
sp.repriced_count += 1
logger.info(
"[tradecraft-compat] Repriced sub_payment #%d: %d -> %d "
"lovelace (ADA=$%.4f, reprice #%d)",
sp.id,
old_lovelace or 0,
new_lovelace,
ada_price,
sp.repriced_count,
)
except Exception as e:
logger.exception(
"[tradecraft-compat] Error repricing sub_payment #%d: %s",
sp.id,
e,
)
expired_result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status == "awaiting_payment",
SubscriptionPayment.expires_at <= now,
SubscriptionPayment.repriced_count >= 3,
)
)
for sp in expired_result.scalars().all():
sp.status = "expired"
logger.info(
"[tradecraft-compat] sub_payment #%d expired after %d repricings",
sp.id,
sp.repriced_count,
)
await db.commit()
except Exception:
logger.exception(
"[tradecraft-compat] reprice_subscription_payments job failed"
)
async def enforce_grace_period() -> None: # pragma: no cover — TradeCraft-only
"""Daily grace-period enforcement — past_due / suspended transitions."""
from sqlalchemy import select
Company, Subscription, SubscriptionPayment, async_session_maker = (
_require_tradecraft_models()
)
try:
async with async_session_maker() as db:
today = datetime.now(timezone.utc).date()
overdue_result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status.in_(
["awaiting_payment", "underpaid", "expired"]
),
SubscriptionPayment.due_date < today,
)
)
overdue_payments = overdue_result.scalars().all()
for sp in overdue_payments:
try:
sub_result = await db.execute(
select(Subscription).where(
Subscription.company_id == sp.company_id
)
)
sub = sub_result.scalar_one_or_none()
if not sub or sub.status in ("cancelled", "suspended"):
continue
company_result = await db.execute(
select(Company).where(Company.id == sp.company_id)
)
company = company_result.scalar_one_or_none()
if sp.grace_deadline and today > sp.grace_deadline:
if sub.status != "suspended":
sub.status = "suspended"
sub.updated_at = datetime.now(timezone.utc)
if company:
company.subscription_status = "suspended"
logger.info(
"[tradecraft-compat] company_id=%d suspended "
"(grace deadline %s passed)",
sp.company_id,
sp.grace_deadline,
)
elif sub.status == "active":
sub.status = "past_due"
sub.updated_at = datetime.now(timezone.utc)
if company:
company.subscription_status = "past_due"
logger.info(
"[tradecraft-compat] company_id=%d past_due "
"(due_date %s passed)",
sp.company_id,
sp.due_date,
)
except Exception as e:
logger.exception(
"[tradecraft-compat] Error enforcing grace period "
"for sub_payment #%d: %s",
sp.id,
e,
)
await db.commit()
except Exception:
logger.exception(
"[tradecraft-compat] enforce_grace_period job failed"
)
# ---------------------------------------------------------------------------
# Standalone TradeCraft scheduler — registers the subscription jobs only.
# ---------------------------------------------------------------------------
_tc_scheduler: Optional[AsyncIOScheduler] = None
async def start_tradecraft_scheduler() -> None: # pragma: no cover — TradeCraft-only
"""Start ONLY the TradeCraft-specific subscription + grace-period jobs.
The generic invoice jobs should be run via :class:`InvoiceScheduler`
against a ``SQLAlchemyInvoiceStore`` adapter (not shipped here
TradeCraft is responsible for implementing it during the migration).
"""
global _tc_scheduler
if _tc_scheduler and _tc_scheduler.running:
return
_tc_scheduler = AsyncIOScheduler()
_tc_scheduler.add_job(
check_subscription_payments,
trigger=IntervalTrigger(seconds=60),
id="tradecraft_check_sub_payments",
name="TradeCraft: Check Subscription Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_tc_scheduler.add_job(
reprice_subscription_payments,
trigger=IntervalTrigger(hours=6),
id="tradecraft_reprice_sub_payments",
name="TradeCraft: Reprice Subscription Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_tc_scheduler.add_job(
enforce_grace_period,
trigger=CronTrigger(hour=6, minute=0, timezone="UTC"),
id="tradecraft_enforce_grace",
name="TradeCraft: Enforce Subscription Grace Periods",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_tc_scheduler.start()
logger.info(
"[tradecraft-compat] Started — subscription + grace-period jobs only"
)
async def stop_tradecraft_scheduler() -> None: # pragma: no cover — TradeCraft-only
"""Stop the TradeCraft-specific scheduler."""
global _tc_scheduler
if _tc_scheduler:
_tc_scheduler.shutdown(wait=False)
_tc_scheduler = None