v1.0.0-dev: slim to the real product — merchant state machine only

Drop everything that duplicates PyCardano. The landscape scan done
2026-04-23 confirmed: no ecosystem gap for wallet/chain/tx-build —
pycardano 0.19.x covers all of it cleanly. The gap is the merchant
state machine, so that's all we ship.

Deleted:
- addresses.py  → consumers call pycardano.HDWallet directly
- txbuild.py    → consumers use pycardano.OgmiosChainContext directly
- oracles.py    → consumers supply a price_fn callable
- mint.py       → consumers build mint txs with pycardano;
                  CIP-25 v2 metadata builder shipped as a copy-paste
                  snippet in the README
- ipfs.py       → py-ipfs-http-client covers it
- tradecraft_compat.py → no one was importing it; kill
- docs/minting-workflow.md → redundant with README pairing guidance

Refactored:
- monitor.evaluate_utxos: ADA-only. The DexHunter token-equivalent
  block came out. Consumers who want stablecoin support wrap the
  function with their own asset-to-lovelace converter.
- monitor.reprice_expired_invoices: takes a new required kwarg
  price_fn: Callable[[float], Awaitable[int]]. No more ADA/USD
  oracle shipped in the SDK.
- scheduler.InvoiceScheduler: takes an optional price_fn field;
  if None, the reprice job is a no-op (works for fixed-ADA invoices).

Tests (26/26 passing, all offline):
- test_invoice.py — state-machine helpers, 3 tests
- test_store_protocol.py — Protocol conformance + InMemoryStore round-trips, 13 tests
- test_monitor_with_inmemory_store.py — all status transitions + reprice,
  rewired to pass price_fn fixtures instead of monkeypatching oracle funcs

Deps dropped: pycardano (consumer pairing, not our dep).
Deps kept: httpx (Koios), apscheduler (background scheduler).

Package shape (1.0.0-dev, ~700 LOC src):

  cardano_checkout/
    invoice.py    —  Invoice + InvoiceStatus
    store.py      —  InvoiceStore Protocol + InMemoryStore
    monitor.py    —  Koios poll + UTxO matching + reprice driver
    scheduler.py  —  APScheduler wrapper

README rewritten top-to-bottom: "what we ship", "what we don't ship",
why the niche exists, pycardano-directly examples for the delete-list,
CIP-25 builder as a 20-line copy-paste, InvoiceStore implementation
example. Apache-2.0 license unchanged.
This commit is contained in:
Kayos 2026-04-23 21:58:26 -07:00
parent 68cb535c0f
commit af41f945b1
16 changed files with 286 additions and 2714 deletions

View file

@ -1,68 +1,71 @@
"""cardano_checkout — Python SDK for merchant-side Cardano payments + NFT cert minting.
"""cardano-checkout — merchant-side Cardano payment lifecycle in Python.
Zero-custody by design: consumers provide a wallet xpub (account-level
extended public key). The SDK derives unique receive addresses per
invoice, polls the chain for payment, and (optionally) mints a CIP-25
NFT certificate of authenticity on confirmation.
Zero-custody by design: the merchant brings a wallet xpub and an
:class:`~cardano_checkout.store.InvoiceStore` implementation. The SDK
owns the payment lifecycle per-invoice receive-address bookkeeping,
Koios UTxO polling, confirm / underpay / overpay classification, and
time-windowed repricing against a consumer-supplied oracle.
**The SDK deliberately does NOT ship Cardano primitives.** Address
derivation, transaction building, chain context, and native-script
minting all live in `pycardano <https://github.com/Python-Cardano/pycardano>`_
and are consumer concerns. See the README for the pairing pattern and
for the CIP-25 v2 metadata-builder snippet (a 60-line helper that fits
anywhere in your own code without needing a separate dep).
Quick start::
from cardano_checkout import addresses, oracles
from cardano_checkout import Invoice, InvoiceStatus, InMemoryStore, InvoiceScheduler
addr = addresses.derive_address(xpub_hex, index=42, network="mainnet")
price = await oracles.get_ada_usd_price()
lovelace = await oracles.convert_usd_to_lovelace(99.00)
store = InMemoryStore() # or your SQLAlchemy / asyncpg / sqlite adapter
For full invoice lifecycle see :mod:`cardano_checkout.invoice` +
:mod:`cardano_checkout.store` (Protocol-based persistence).
async def my_price_fn(usd: float) -> int:
# your oracle — CoinGecko / Koios ticker / fixed rate in tests
rate = await fetch_ada_usd_rate()
return int(round(usd / rate * 1_000_000))
For NFT minting see :mod:`cardano_checkout.mint`.
scheduler = InvoiceScheduler(store=store, price_fn=my_price_fn)
await scheduler.start()
"""
from __future__ import annotations
__version__ = "0.2.0-dev"
__version__ = "1.0.0-dev"
# Pure modules — stable API from extraction
from cardano_checkout import addresses, oracles # noqa: F401
# Payment lifecycle
from cardano_checkout.invoice import Invoice, InvoiceStatus # noqa: F401
from cardano_checkout.store import InMemoryStore, InvoiceStore # noqa: F401
# Monitoring + scheduling
from cardano_checkout.monitor import ( # noqa: F401
CONFIRM_TOLERANCE,
DEFAULT_MAX_REPRICINGS,
DEFAULT_PAYMENT_WINDOW_MINUTES,
KOIOS_URL,
OVERPAY_THRESHOLD,
PriceFn,
check_address_utxos,
check_pending_invoices,
evaluate_utxos,
reprice_expired_invoices,
)
from cardano_checkout.scheduler import InvoiceScheduler # noqa: F401
# NFT + IPFS
from cardano_checkout.mint import ( # noqa: F401
MintPolicy,
UnsignedMint,
build_cip25_metadata,
mint_nft_cert,
submit_signed_tx,
)
from cardano_checkout.ipfs import IPFSClient, pin_bytes # noqa: F401
__all__ = [
"__version__",
"addresses",
"oracles",
# Invoice lifecycle
"Invoice",
"InvoiceStatus",
# Persistence
"InvoiceStore",
"InMemoryStore",
# Monitor + scheduler
"PriceFn",
"InvoiceScheduler",
"check_address_utxos",
"check_pending_invoices",
"evaluate_utxos",
"reprice_expired_invoices",
"MintPolicy",
"UnsignedMint",
"mint_nft_cert",
"submit_signed_tx",
"build_cip25_metadata",
"IPFSClient",
"pin_bytes",
"KOIOS_URL",
"CONFIRM_TOLERANCE",
"OVERPAY_THRESHOLD",
"DEFAULT_MAX_REPRICINGS",
"DEFAULT_PAYMENT_WINDOW_MINUTES",
]

View file

@ -1,236 +0,0 @@
"""
Cardano HD address derivation service.
Derives Cardano base addresses from an account-level extended public key (xpub)
exported from wallets such as Eternl or Lace. Uses BIP-44 derivation via pycardano.
Key derivation path: m / 1852' / 1815' / account' / chain / index
- chain 0 = external (receive) addresses
- chain 2 = staking key (always index 0 for the account)
The xpub accepted here is the *account* public key the root has already been
hardened away by the wallet. We only perform soft derivation from account level
down, so no private key material is ever needed or touched.
"""
import logging
from typing import Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def derive_address(xpub_hex: str, index: int, network: str = "mainnet") -> str:
"""
Derive a Cardano base address at the given receive-address index.
The address is a Shelley-era base address combining:
- payment key: account_xpub / 0 (external chain) / index
- staking key: account_xpub / 2 (staking chain) / 0
Args:
xpub_hex: Hex-encoded account extended public key (64 bytes raw or
96 bytes with chain code, as exported by most CIP-1852 wallets).
index: Receive address index (0-based). Must be >= 0.
network: "mainnet" or "testnet" (preprod / preview). Defaults to mainnet.
Returns:
Bech32-encoded Cardano base address (addr1... or addr_test1...).
Raises:
ValueError: If xpub_hex is malformed, index is negative, or network is invalid.
RuntimeError: If pycardano is not installed or derivation fails unexpectedly.
"""
_require_pycardano()
if index < 0:
raise ValueError(f"Address index must be non-negative, got {index}")
net = _parse_network(network)
acct_pub = _parse_xpub(xpub_hex)
try:
# External receive chain (0) / address index — soft (non-hardened) derivation.
addr_node = acct_pub.derive(0, private=False).derive(index, private=False)
# Staking chain (2) / always index 0 for the account.
stake_node = acct_pub.derive(2, private=False).derive(0, private=False)
except Exception as exc:
logger.exception("[cardano] Key derivation failed at index %d", index)
raise RuntimeError(f"Key derivation failed: {exc}") from exc
from pycardano import (
Address,
PaymentVerificationKey,
StakeVerificationKey,
)
pay_vk = PaymentVerificationKey.from_primitive(addr_node.public_key)
stake_vk = StakeVerificationKey.from_primitive(stake_node.public_key)
address = Address(
payment_part=pay_vk.hash(),
staking_part=stake_vk.hash(),
network=net,
)
return str(address)
def validate_xpub(xpub_hex: str) -> bool:
"""
Validate that an xpub string is well-formed and parseable.
Checks:
- Is a non-empty string
- Is valid hex
- Is a valid pycardano HDPublicKey (correct byte length, valid point on curve)
Args:
xpub_hex: Hex-encoded account extended public key.
Returns:
True if the xpub is valid, False otherwise. Never raises.
"""
if not xpub_hex or not isinstance(xpub_hex, str):
return False
# Quick hex sanity before paying the crypto cost
stripped = xpub_hex.strip()
if not _is_hex(stripped):
return False
try:
_require_pycardano()
node = _parse_xpub(stripped)
# Soft-derive a single child to prove the key is usable — HDWallet
# construction is lazy, so we actually exercise the BIP32 math.
node.derive(0, private=False)
return True
except Exception:
return False
def get_address_preview(xpub_hex: str, network: str = "mainnet") -> str:
"""
Derive the address at index 0 for settings UI preview.
Thin wrapper around derive_address exists so callers don't have to
know or care about the index convention.
Args:
xpub_hex: Hex-encoded account extended public key.
network: "mainnet" or "testnet". Defaults to mainnet.
Returns:
Bech32-encoded Cardano base address at index 0.
Raises:
ValueError: If xpub_hex is malformed or network is invalid.
RuntimeError: If derivation fails unexpectedly.
"""
return derive_address(xpub_hex, index=0, network=network)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _require_pycardano() -> None:
"""Raise a clear RuntimeError if pycardano is not installed."""
try:
import pycardano # noqa: F401
except ImportError as exc:
raise RuntimeError(
"pycardano is required for Cardano address derivation. "
"Add pycardano>=0.11.0 to requirements.txt and reinstall."
) from exc
def _parse_network(network: str):
"""
Parse a network string into a pycardano Network enum value.
Args:
network: "mainnet" or "testnet".
Returns:
pycardano.Network enum member.
Raises:
ValueError: If network is not one of the accepted values.
"""
from pycardano import Network
if network == "mainnet":
return Network.MAINNET
if network == "testnet":
return Network.TESTNET
raise ValueError(
f"Invalid network '{network}'. Expected 'mainnet' or 'testnet'."
)
def _parse_xpub(xpub_hex: str):
"""
Parse a hex-encoded extended public key into a public-only HDWallet node.
pycardano exposes soft-derivation through :class:`pycardano.HDWallet`.
An account-level xpub is 64 bytes (32-byte Ed25519 public key +
32-byte chain code). Some wallets export 96 bytes; if so, we strip
the first 32 bytes which are typically a zeroed / duplicated prefix.
Args:
xpub_hex: Hex-encoded extended public key string.
Returns:
pycardano.HDWallet node rooted at the account level, with private
key fields unset. ``node.derive(index, private=False)`` performs
the soft CIP-1852 derivation we need.
Raises:
ValueError: If the byte length is unexpected or the key is invalid.
"""
from pycardano import HDWallet
try:
raw = bytes.fromhex(xpub_hex.strip())
except ValueError as exc:
raise ValueError(f"xpub_hex is not valid hex: {exc}") from exc
# Standard CIP-1852 account xpub is 64 bytes (pubkey || chain_code).
# Some export formats prepend 32 zeroed or duplicated bytes — handle both.
if len(raw) == 64:
pass # Expected format.
elif len(raw) == 96:
raw = raw[32:]
else:
raise ValueError(
f"Unexpected xpub length: {len(raw)} bytes. "
"Expected 64 bytes (pubkey + chain_code)."
)
public_key = raw[:32]
chain_code = raw[32:]
try:
return HDWallet(
public_key=public_key,
chain_code=chain_code,
path="m/1852'/1815'/0'",
)
except Exception as exc:
raise ValueError(f"xpub is not a valid extended public key: {exc}") from exc
def _is_hex(value: str) -> bool:
"""Return True if every character in value is a valid hex digit."""
if not value:
return False
try:
bytes.fromhex(value)
return True
except ValueError:
return False

View file

@ -1,107 +0,0 @@
"""Minimal IPFS client — upload + pin via kubo's HTTP API.
Designed for the ``chromaticcraft`` shape: a small local kubo daemon
runs alongside the web app, accepts uploads from end users (e.g. Abby
uploading a photo of a finished custom order), pins locally for fast
serving, and optionally mirrors pins to a second remote node
(Lucy-on-LAN) for archival redundancy.
No IPFS libraries are imported just httpx against the kubo REST API
(v0). Keeps the SDK surface minimal.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
@dataclass
class IPFSClient:
"""Kubo-compatible IPFS client.
Attributes:
api_url: Base URL of the kubo HTTP API (default ``http://127.0.0.1:5001``).
timeout: Per-request timeout in seconds (default 60 uploads can be slow).
mirror_api_urls: Optional list of additional kubo endpoints to
``pin add`` the CID on after a successful primary pin. Use this
to mirror to Lucy or any other archival node.
"""
api_url: str = "http://127.0.0.1:5001"
timeout: float = 60.0
mirror_api_urls: list[str] = None # type: ignore[assignment]
def __post_init__(self) -> None:
if self.mirror_api_urls is None:
self.mirror_api_urls = []
async def add(self, data: bytes, filename: str = "upload") -> str:
"""Upload bytes and pin them locally.
Args:
data: Raw bytes to add.
filename: Logical name used by clients browsing the DAG
(doesn't affect the CID).
Returns:
CID (base58, v0 or base32 v1 depending on kubo defaults).
Raises:
RuntimeError: If the daemon is unreachable or returns a non-2xx.
"""
url = f"{self.api_url.rstrip('/')}/api/v0/add"
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(
url,
files={"file": (filename, data, "application/octet-stream")},
params={"pin": "true", "cid-version": "1"},
)
if resp.status_code >= 400:
raise RuntimeError(f"ipfs add {resp.status_code}: {resp.text[:200]}")
# kubo's /add streams NDJSON; each line is one {Name, Hash, Size}.
# For a single file upload the last line carries the wrapping CID.
last_cid: Optional[str] = None
for line in resp.text.strip().splitlines():
if '"Hash"' in line:
import json
obj = json.loads(line)
last_cid = obj.get("Hash")
if not last_cid:
raise RuntimeError(f"ipfs add: no CID in response: {resp.text[:200]}")
# Mirror pins (best effort — a mirror failure should not poison the primary upload).
for mirror in self.mirror_api_urls:
try:
await self._pin_on(mirror, last_cid)
except Exception as exc:
logger.warning("[ipfs] mirror pin to %s failed for %s: %s", mirror, last_cid, exc)
return last_cid
async def _pin_on(self, api_url: str, cid: str) -> None:
"""Pin an existing CID on a remote kubo node."""
url = f"{api_url.rstrip('/')}/api/v0/pin/add"
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(url, params={"arg": cid})
if resp.status_code >= 400:
raise RuntimeError(f"pin/add {resp.status_code}: {resp.text[:200]}")
async def pin_bytes(
data: bytes,
api_url: str = "http://127.0.0.1:5001",
mirror_api_urls: Optional[list[str]] = None,
filename: str = "upload",
) -> str:
"""Convenience wrapper: one-shot upload + pin (+ optional mirror).
Returns the CID.
"""
client = IPFSClient(api_url=api_url, mirror_api_urls=mirror_api_urls or [])
return await client.add(data, filename=filename)

View file

@ -1,442 +0,0 @@
"""CIP-25 v2 NFT certificate-of-authenticity minting.
This module produces the NFT cert attached to a confirmed merchant
order. One NFT per order, pinned-once metadata (image CID from IPFS
via :mod:`cardano_checkout.ipfs`), sent directly to the customer's
wallet in the same transaction.
Design decisions:
- **CIP-25 v2** (not CIP-68). CIP-25 is universally supported by
every Cardano wallet (Eternl, Lace, Yoroi, Vespr, Typhon). CIP-68
adds reference-NFT mutability we do not need for a static cert.
- **Single policy per merchant studio.** All of a studio's certs share
one policy_id so wallets group them cleanly. The policy key is a
native script under the studio's custody — Sulkta pattern is a
multi-sig native script stored on Lucy.
- **Policy has a time-lock** (invalid-after slot) so the "no more
editions can be minted after X" claim is cryptographically enforceable.
Recommended: generous lock (100 years) so policy_id stays stable,
but revokable in-contract via ``mint policy revoke`` flow.
- **No reference script, no Plutus.** Pure native scripts + standard
CIP-25 metadata keeps the tx cheap (~0.18 ADA fee + min-utxo for the
NFT output).
Cold-signing workflow
---------------------
The mint function does *not* sign. It builds the transaction body + the
auxiliary data, computes the tx id, and returns an :class:`UnsignedMint`
carrying the CBOR-encoded body plus a human-readable summary so the
operator can sanity-check before signing. The operator then:
1. Transfers the unsigned CBOR to the cold host (Lucy, via `scp`, USB,
QR code, whatever the threat model tolerates).
2. Signs offline with the policy-required skey(s) for Sulkta's
chromatic policy that's ``Cobb.skey`` + ``Kayos.skey``.
3. Transfers the signed CBOR back to the hot host.
4. Calls :func:`submit_signed_tx` to hand it to Ogmios.
See ``docs/minting-workflow.md`` for the full operator runbook.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional
logger = logging.getLogger(__name__)
if TYPE_CHECKING: # pragma: no cover — hints only
from pycardano import ChainContext
# ---------------------------------------------------------------------------
# Policy model
# ---------------------------------------------------------------------------
@dataclass
class MintPolicy:
"""A native-script minting policy under the SDK's custody model.
Attributes:
policy_id: Hex blake2b-224 hash of the native script CBOR. Stable
for the life of the policy shipped with every cert minted
under it. Becomes the Cardano ``policy_id`` of the NFT asset.
script_cbor_hex: Hex-encoded CBOR of the native script itself.
Submitted alongside the mint tx witness.
required_signer_hashes: Payment-key hashes (hex) of every skey
that must sign the mint tx. For Sulkta's chromatic policy
this is 2 entries: Cobb + Kayos.
locked_after_slot: Optional slot beyond which the policy rejects
further mints. None = no time lock (not recommended for
certificates a lock makes the "no more editions" claim
mathematically verifiable).
"""
policy_id: str
script_cbor_hex: str
required_signer_hashes: list[str] = field(default_factory=list)
locked_after_slot: Optional[int] = None
@dataclass
class UnsignedMint:
"""An unsigned mint transaction, ready to be handed to a cold signer.
Attributes:
tx_id: Transaction hash computed from the body alone (stable across
signing the same id the explorer will show once submitted).
tx_body_cbor_hex: Hex-encoded CBOR of the transaction *body*.
This is what gets moved to the cold host.
auxiliary_data_cbor_hex: Hex-encoded CBOR of the auxiliary data
(metadata + native script). Required to reconstruct the full
transaction before submission.
native_script_cbor_hex: Hex-encoded CBOR of the minting policy's
native script. Needed by the cold signer to construct the
correct witness set.
required_signer_hashes: List of payment-key hashes (hex) the cold
signer must provide. Mirrors ``MintPolicy.required_signer_hashes``.
summary: Human-readable description of the tx operator should
eyeball this before signing to confirm they're signing what
they think they're signing.
"""
tx_id: str
tx_body_cbor_hex: str
auxiliary_data_cbor_hex: str
native_script_cbor_hex: str
required_signer_hashes: list[str]
summary: str
# ---------------------------------------------------------------------------
# Metadata builder (pure, no pycardano dep)
# ---------------------------------------------------------------------------
def build_cip25_metadata(
policy_id: str,
asset_name: str,
name: str,
image_cid: str,
description: str = "",
media_type: str = "image/jpeg",
properties: Optional[dict] = None,
) -> dict:
"""Assemble the ``{721: {...}}`` metadatum envelope for a single NFT.
CIP-25 v2 image field takes an ``ipfs://<CID>`` URI. Description, if
longer than 64 characters, is split into an array of 64-char chunks
(CIP-25 constraint from the Cardano metadata schema strings larger
than 64 chars are encoded as a list of chunks).
Args:
policy_id: Hex policy id (same as on the asset).
asset_name: UTF-8 asset name used as the dict key under policy_id.
name: Human-readable NFT title (shown in wallets).
image_cid: IPFS CID the function prepends ``ipfs://``.
description: Optional longer text. Will be chunked if > 64 chars.
media_type: MIME type of the image. Default ``image/jpeg``.
properties: Additional key/value pairs merged into the metadata blob.
Returns:
Dict ready to submit as tx metadatum label 721.
"""
def chunk64(s: str) -> list[str]:
if len(s) <= 64:
return [s]
return [s[i : i + 64] for i in range(0, len(s), 64)]
desc: object = description
if isinstance(description, str) and len(description) > 64:
desc = chunk64(description)
body: dict = {
"name": name,
"image": f"ipfs://{image_cid}",
"mediaType": media_type,
}
if desc:
body["description"] = desc
if properties:
body.update(properties)
return {
"721": {
policy_id: {
asset_name: body,
},
"version": "2.0",
}
}
# ---------------------------------------------------------------------------
# Mint transaction builder (cold-signer flow)
# ---------------------------------------------------------------------------
def _require_pycardano():
try:
import pycardano # noqa: F401
except ImportError as exc: # pragma: no cover — env sanity
raise RuntimeError(
"pycardano is required for mint transaction construction. "
"Add pycardano>=0.11.0 to requirements.txt and reinstall."
) from exc
def _metadata_dict_with_int_keys(metadata: dict) -> dict:
"""Convert string top-level metadata labels to ints for pycardano Metadata.
CIP-25 v2 nests everything under label ``721``. We accept both ``{"721": ...}``
(builder output) and ``{721: ...}`` (raw) for ergonomics.
"""
converted: dict = {}
for key, val in metadata.items():
try:
converted[int(key)] = val
except (TypeError, ValueError):
converted[key] = val
return converted
async def mint_nft_cert(
policy: MintPolicy,
asset_name: str,
metadata: dict,
recipient_address: str,
funding_address: str,
context: Optional["ChainContext"] = None,
ogmios_host: str = "127.0.0.1",
ogmios_port: int = 1337,
network: str = "mainnet",
min_lovelace_for_nft_utxo: int = 1_500_000,
) -> UnsignedMint:
"""Build an unsigned mint+send transaction for a CIP-25 v2 NFT cert.
Constructs a transaction that:
1. Mints exactly 1 of ``{policy.policy_id}.{asset_name}``.
2. Sends that single token to ``recipient_address`` in its own UTxO
with the minimum-ADA padding (default 1.5 ADA).
3. Attaches the CIP-25 v2 metadata (label 721) + the policy's
native script as tx auxiliary data.
4. Returns the unsigned body for the cold signer to sign does NOT
sign, does NOT submit.
UTxOs for fees + min-ADA are sourced from ``funding_address`` (the
merchant's hot wallet on Rackham, which does not hold any policy keys).
Args:
policy: Merchant's minting policy.
asset_name: UTF-8 asset name (will be hex-encoded per CIP-25). Max 32 bytes.
metadata: CIP-25 metadata dict typically the output of
:func:`build_cip25_metadata`. Accepts ``{"721": ...}`` or ``{721: ...}``.
recipient_address: Bech32 address of the wallet that receives the NFT.
funding_address: Bech32 address that pays the tx fee + NFT min-ADA.
context: Optional chain context. If omitted a fresh
:class:`pycardano.OgmiosChainContext` is built from
``ogmios_host``/``ogmios_port``.
ogmios_host: Host of the local Ogmios HTTP+WS endpoint.
ogmios_port: Port of the local Ogmios endpoint.
network: ``"mainnet"`` or ``"testnet"`` (preprod / preview).
min_lovelace_for_nft_utxo: ADA (in lovelace) to attach to the NFT
output so it satisfies the ledger's min-UTxO floor. Default 1.5 ADA.
Returns:
:class:`UnsignedMint` bundle ready for the cold-signer hand-off.
Raises:
RuntimeError: If pycardano is unavailable, or tx construction fails.
ValueError: If ``asset_name`` is empty or > 32 bytes.
"""
_require_pycardano()
if not asset_name or len(asset_name.encode("utf-8")) > 32:
raise ValueError(
"asset_name must be a non-empty UTF-8 string <= 32 bytes "
f"(got {len(asset_name.encode('utf-8'))} bytes)"
)
from pycardano import (
Address,
Asset,
AssetName,
AuxiliaryData,
Metadata,
MultiAsset,
NativeScript,
Network,
ScriptHash,
TransactionBuilder,
TransactionOutput,
Value,
)
if context is None:
from cardano_checkout.txbuild import make_ogmios_context
context = make_ogmios_context(
host=ogmios_host, port=ogmios_port, network=network
)
net = Network.MAINNET if network == "mainnet" else Network.TESTNET
# ------------------------------------------------------------------
# Assemble the mint MultiAsset
# ------------------------------------------------------------------
policy_hash = ScriptHash.from_primitive(bytes.fromhex(policy.policy_id))
asset_name_obj = AssetName(asset_name.encode("utf-8"))
asset = Asset()
asset[asset_name_obj] = 1
mint_bundle = MultiAsset()
mint_bundle[policy_hash] = asset
# ------------------------------------------------------------------
# Native script + auxiliary data (metadata + script witness)
# ------------------------------------------------------------------
native_script = NativeScript.from_cbor(bytes.fromhex(policy.script_cbor_hex))
metadata_obj = Metadata(_metadata_dict_with_int_keys(metadata))
aux = AuxiliaryData(metadata_obj)
# AuxiliaryData in pycardano also carries native_scripts attached to the tx body;
# the builder below handles native scripts separately via add_minting_script.
# ------------------------------------------------------------------
# Addresses
# ------------------------------------------------------------------
sender = Address.from_primitive(funding_address)
recipient = Address.from_primitive(recipient_address)
if sender.network != net or recipient.network != net:
raise ValueError(
f"Address network mismatch: requested {network}, "
f"sender={sender.network.name}, recipient={recipient.network.name}"
)
# ------------------------------------------------------------------
# Build the transaction
# ------------------------------------------------------------------
builder = TransactionBuilder(context)
builder.add_input_address(sender)
# Attach mint bundle + policy as a minting script.
builder.mint = mint_bundle
builder.native_scripts = [native_script]
builder.auxiliary_data = aux
# Output: the newly minted NFT in its own UTxO at the recipient, padded
# with min-ADA so the ledger accepts it.
nft_value = Value(min_lovelace_for_nft_utxo, mint_bundle)
builder.add_output(TransactionOutput(recipient, nft_value))
# If the policy has a time lock, the mint tx MUST set ttl <= locked_after_slot
# or the node will reject the witness. Let pycardano pick validity normally,
# but clamp ttl when a lock slot is set.
ttl_offset = None
if policy.locked_after_slot is not None:
try:
chain_tip = context.last_block_slot # type: ignore[attr-defined]
# Cap at 2 hours or (locked_after_slot - chain_tip), whichever is smaller.
two_hours_in_slots = 2 * 60 * 60 # ~1 slot/s on mainnet
ttl_offset = max(
60, min(two_hours_in_slots, policy.locked_after_slot - chain_tip)
)
except Exception: # pragma: no cover — context without chain tip
ttl_offset = None
try:
tx_body = builder.build(
change_address=sender,
auto_ttl_offset=ttl_offset,
auto_validity_start_offset=-30,
)
except Exception as exc:
raise RuntimeError(f"Failed to build mint tx body: {exc}") from exc
tx_id = str(tx_body.id)
summary_lines = [
f"Mint 1 x {policy.policy_id}.{asset_name}",
f" -> recipient: {recipient_address}",
f" fees paid by: {funding_address}",
f" tx_id (pre-sign): {tx_id}",
f" network: {network}",
f" required signers: {len(policy.required_signer_hashes)} "
f"({', '.join(h[:16] + '...' for h in policy.required_signer_hashes) or 'NONE — check policy'})",
]
if policy.locked_after_slot is not None:
summary_lines.append(
f" policy time-lock: slot <= {policy.locked_after_slot}"
)
return UnsignedMint(
tx_id=tx_id,
tx_body_cbor_hex=tx_body.to_cbor_hex(),
auxiliary_data_cbor_hex=aux.to_cbor_hex(),
native_script_cbor_hex=policy.script_cbor_hex,
required_signer_hashes=list(policy.required_signer_hashes),
summary="\n".join(summary_lines),
)
# ---------------------------------------------------------------------------
# Signed-tx submission
# ---------------------------------------------------------------------------
def submit_signed_tx(
signed_tx_cbor_hex: str,
context: Optional["ChainContext"] = None,
ogmios_host: str = "127.0.0.1",
ogmios_port: int = 1337,
network: str = "mainnet",
) -> str:
"""Submit a cold-signed transaction to the network via Ogmios.
The cold signer produces a fully-assembled :class:`pycardano.Transaction`
body + witness set + auxiliary data serialised as CBOR. This
function deserialises that blob, hands it to Ogmios, and returns the
tx hash.
Args:
signed_tx_cbor_hex: Hex-encoded CBOR of the signed transaction.
context: Optional chain context; built from ``ogmios_host/port`` if omitted.
ogmios_host: Host of the Ogmios endpoint.
ogmios_port: Port of the Ogmios endpoint.
network: ``"mainnet"`` or ``"testnet"``.
Returns:
Transaction hash (hex) stable identifier for the submitted tx.
Raises:
RuntimeError: If pycardano is unavailable, or submission fails.
"""
_require_pycardano()
from pycardano import Transaction
if context is None:
from cardano_checkout.txbuild import make_ogmios_context
context = make_ogmios_context(
host=ogmios_host, port=ogmios_port, network=network
)
try:
tx = Transaction.from_cbor(bytes.fromhex(signed_tx_cbor_hex))
except Exception as exc:
raise RuntimeError(f"signed_tx_cbor_hex is not valid transaction CBOR: {exc}") from exc
try:
context.submit_tx(tx) # type: ignore[attr-defined]
except Exception as exc:
raise RuntimeError(f"Ogmios rejected the signed tx: {exc}") from exc
tx_hash = str(tx.id)
logger.info("[mint] submitted signed tx %s", tx_hash)
return tx_hash

View file

@ -30,19 +30,23 @@ from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
from typing import Awaitable, Callable, Optional
import httpx
from cardano_checkout.invoice import Invoice, InvoiceStatus
from cardano_checkout.oracles import (
KNOWN_TOKENS,
convert_token_to_lovelace,
convert_usd_to_lovelace,
get_ada_usd_price,
)
from cardano_checkout.store import InvoiceStore
# Consumer-supplied pricing callable: takes a USD amount (float),
# returns the current-market lovelace equivalent as int. Invoked by
# :func:`reprice_expired_invoices` to generate fresh quotes when an
# invoice's quote window lapses without payment.
#
# SDK intentionally does NOT ship an oracle. Consumers wire whatever
# price source they trust (CoinGecko, Koios ticker, their own DEX feed,
# or a constant for tests).
PriceFn = Callable[[float], Awaitable[int]]
logger = logging.getLogger(__name__)
KOIOS_URL = "https://api.koios.rest/api/v1/address_utxos"
@ -164,33 +168,12 @@ async def evaluate_utxos(
if qty > 0:
received_assets[asset_id] = received_assets.get(asset_id, 0) + qty
# Convert native assets to lovelace equivalent via DexHunter.
asset_lovelace = 0
for asset_id, qty in received_assets.items():
if "." not in asset_id:
continue
policy_id, asset_name_hex = asset_id.split(".", 1)
decimals = 0
for token_info in KNOWN_TOKENS.values():
if token_info.get("policy_id") == policy_id:
decimals = token_info.get("decimals", 0)
break
try:
lv = await convert_token_to_lovelace(
policy_id, asset_name_hex, qty, decimals
)
if lv is not None:
asset_lovelace += lv
except Exception as e:
logger.warning(
"[cardano-monitor] Failed to convert asset %s to lovelace: %s",
asset_id[:20],
e,
)
total_value = raw_lovelace + asset_lovelace
# ADA-only matching. Any native tokens landed in the same UTxOs are
# recorded in received_assets for visibility but do NOT contribute to
# the payment-matched total. Consumers who want to accept stablecoins
# or other native tokens wrap this function with their own asset-to-
# lovelace converter before comparing against expected_lovelace.
total_value = raw_lovelace
if expected_lovelace == 0:
# Degenerate case — any payment at all counts.
@ -313,23 +296,35 @@ async def check_pending_invoices(
async def reprice_expired_invoices(
store: InvoiceStore,
*,
price_fn: PriceFn,
window_minutes: int = DEFAULT_PAYMENT_WINDOW_MINUTES,
max_repricings: int = DEFAULT_MAX_REPRICINGS,
limit: int = 100,
) -> int:
"""Reprice PENDING invoices whose expiry has passed.
Pulls the current ADA/USD oracle price, recalculates ``expected_lovelace``
from the invoice's ``usd_amount``, resets ``expires_at`` to
``now + window_minutes``, and tracks reprice count in ``invoice.metadata``
under the key ``repriced_count``. After ``max_repricings`` the invoice
is transitioned to :class:`InvoiceStatus.EXPIRED`.
Calls the consumer-supplied ``price_fn(usd_amount) -> lovelace`` to
recompute ``expected_lovelace`` at current market. Resets ``expires_at``
to ``now + window_minutes`` and increments ``invoice.metadata["repriced_count"]``.
After ``max_repricings`` the invoice transitions to
:class:`InvoiceStatus.EXPIRED`.
Args:
store: Persistence backend.
window_minutes: New expiry window per reprice. Matches TradeCraft's
platform-config-driven value of 15 minutes by default.
max_repricings: Give-up threshold. TradeCraft default is 3.
price_fn: Async callable that takes a USD amount and returns the
current lovelace equivalent. Consumer-supplied the SDK does
not ship an oracle. A simple wiring looks like::
from cardano_checkout.monitor import reprice_expired_invoices
async def my_price_fn(usd: float) -> int:
rate = await coingecko_fetch_ada_usd() # your code
return int(round(usd / rate * 1_000_000))
await reprice_expired_invoices(store, price_fn=my_price_fn)
window_minutes: New expiry window per reprice. TradeCraft default 15.
max_repricings: Give-up threshold. TradeCraft default 3.
limit: Max pending invoices to process per call.
Returns:
@ -350,13 +345,6 @@ async def reprice_expired_invoices(
"[cardano-monitor] Repricing %d expired invoice(s)", len(expired_candidates)
)
ada_price = await get_ada_usd_price()
if ada_price <= 0:
logger.warning(
"[cardano-monitor] Cannot reprice — ADA price unavailable"
)
return 0
new_expires_at = now + timedelta(minutes=window_minutes)
updated = 0
@ -386,11 +374,20 @@ async def reprice_expired_invoices(
)
continue
new_lovelace = await convert_usd_to_lovelace(usd_amount)
if new_lovelace == 0:
try:
new_lovelace = await price_fn(usd_amount)
except Exception as e:
logger.warning(
"[cardano-monitor] invoice %s: lovelace conversion returned 0, skipping",
"[cardano-monitor] invoice %s: price_fn raised %s, skipping",
invoice.id,
e,
)
continue
if new_lovelace <= 0:
logger.warning(
"[cardano-monitor] invoice %s: price_fn returned %d, skipping",
invoice.id,
new_lovelace,
)
continue
@ -398,18 +395,15 @@ async def reprice_expired_invoices(
invoice.expected_lovelace = new_lovelace
invoice.expires_at = new_expires_at
invoice.metadata["repriced_count"] = repriced_count + 1
invoice.metadata["ada_price_usd"] = round(ada_price, 4)
await store.update(invoice)
updated += 1
logger.info(
"[cardano-monitor] Repriced invoice %s: %d -> %d lovelace "
"(ADA=$%.4f, reprice #%d)",
"[cardano-monitor] Repriced invoice %s: %d -> %d lovelace (reprice #%d)",
invoice.id,
old_lovelace or 0,
new_lovelace,
ada_price,
repriced_count + 1,
)

View file

@ -1,346 +0,0 @@
"""
Cardano Token Price Service Phase 2 of the Cardano payments system.
Provides cached ADA/USD and token/ADA price lookups used to convert
invoice amounts into lovelace (ADA's base unit) for payment requests.
Data sources:
- ADA/USD: CoinGecko free API (no key required, rate-limited)
- Token/ADA: DexHunter v2 API (DEX aggregator on Cardano)
Cache strategy: module-level dict with timestamps. TTL = 5 minutes.
All functions are async, never raise return None/0 on failure.
"""
import logging
import time
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Token registry
# ---------------------------------------------------------------------------
KNOWN_TOKENS: dict[str, dict] = {
"ada": {
"policy_id": "",
"asset_name": "",
"ticker": "ADA",
"decimals": 6,
"type": "native",
},
"djed": {
"policy_id": "8db269c3ec630e06ae29f74bc39edd1f87c819f1056206e879a1cd61",
"asset_name": "444a4544", # "DJED".encode().hex()
"ticker": "DJED",
"decimals": 6,
"type": "stablecoin",
},
"iusd": {
"policy_id": "f66d78b4a3cb3d37afa0ec36461e51ecbde00f26c8f0a68f94b69880",
"asset_name": "69555344", # "iUSD".encode().hex()
"ticker": "iUSD",
"decimals": 6,
"type": "stablecoin",
},
"night": {
"policy_id": "0691b2fecca1ac4f53cb6dfb00b7013e561d1f34403b957cbb5af1fa",
"asset_name": "4e49474854", # "NIGHT".encode().hex()
"ticker": "NIGHT",
"decimals": 6,
"type": "utility",
},
"snek": {
"policy_id": "279c909f348e533da5808898f87f9a14bb2c3dfbbacccd631d927a3f",
"asset_name": "534e454b", # "SNEK".encode().hex()
"ticker": "SNEK",
"decimals": 0,
"type": "meme",
},
"iag": {
"policy_id": "5d16944c1e00a5fa1d14ba2460709bc2e41a18e8e1b86a1e7a09da09",
"asset_name": "494147", # "IAG".encode().hex()
"ticker": "IAG",
"decimals": 6,
"type": "utility",
},
}
# ---------------------------------------------------------------------------
# Internal cache — { key: (value, fetched_at_unix) }
# ---------------------------------------------------------------------------
_CACHE: dict[str, tuple] = {}
_CACHE_TTL_SECONDS = 300 # 5 minutes
def _cache_get(key: str) -> Optional[float]:
"""Return cached value if still fresh, else None."""
entry = _CACHE.get(key)
if entry is None:
return None
value, fetched_at = entry
if time.monotonic() - fetched_at > _CACHE_TTL_SECONDS:
return None
return value
def _cache_set(key: str, value: float) -> None:
"""Store value in cache with current timestamp."""
_CACHE[key] = (value, time.monotonic())
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def get_ada_usd_price() -> float:
"""
Fetch the current ADA/USD price from CoinGecko.
Caches result for 5 minutes. Returns 0.0 on failure callers should
treat 0.0 as a signal that pricing is unavailable.
Endpoint: GET https://api.coingecko.com/api/v3/simple/price
"""
cache_key = "ada_usd"
cached = _cache_get(cache_key)
if cached is not None:
return cached
url = "https://api.coingecko.com/api/v3/simple/price"
params = {"ids": "cardano", "vs_currencies": "usd"}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
price = float(data["cardano"]["usd"])
except httpx.HTTPStatusError as e:
logger.error(
"[cardano_price] CoinGecko request failed: %s %s",
e.response.status_code,
e.response.text[:200],
)
return 0.0
except (KeyError, ValueError, TypeError) as e:
logger.error("[cardano_price] CoinGecko response parse error: %s", e)
return 0.0
except Exception as e:
logger.error("[cardano_price] CoinGecko unexpected error: %s", e)
return 0.0
logger.debug("[cardano_price] ADA/USD = %.6f (live)", price)
_cache_set(cache_key, price)
return price
async def get_token_ada_price(policy_id: str, asset_name_hex: str) -> Optional[float]:
"""
Fetch the price of a Cardano native token in ADA from DexHunter.
Tries the DexHunter v2 bestPool endpoint first, then falls back to the
community pair endpoint. Both return the token's ADA price per base unit.
Args:
policy_id: The token's Cardano policy ID (hex string).
asset_name_hex: The token's asset name as a hex-encoded string.
Derive with: token_ticker.encode().hex()
Returns:
Price in ADA per base unit of the token, or None if no liquidity /
not found / request failed.
Cache: 5 minutes per (policy_id, asset_name_hex) pair.
"""
if not policy_id or asset_name_hex is None:
# ADA itself — price is 1 ADA by definition
return 1.0
asset_id = f"{policy_id}{asset_name_hex}"
cache_key = f"token_ada:{asset_id}"
cached = _cache_get(cache_key)
if cached is not None:
return cached
price: Optional[float] = None
# --- Attempt 1: DexHunter v2 bestPool ---
try:
url = "https://api-v2.dexhunter.io/swap/bestPool"
params = {"tokenA": "lovelace", "tokenB": asset_id}
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
# DexHunter returns price_a_per_b or price_b_per_a depending on direction.
# We want ADA per token — look for the field that represents that.
raw_price = (
data.get("price_b_per_a") # token per lovelace inverse
or data.get("price_a_per_b") # ada per token
or data.get("price")
)
if raw_price is not None:
candidate = float(raw_price)
# bestPool returns lovelace-denominated prices — convert to ADA
# If the value is very large (>1000), it's likely lovelace/token, invert & divide
if candidate > 1000:
price = 1_000_000 / candidate # lovelace per token → ADA per token
else:
price = candidate
logger.debug("[cardano_price] %s bestPool price = %.8f ADA", asset_id[:20], price)
except httpx.HTTPStatusError as e:
if e.response.status_code not in (404, 422):
logger.warning(
"[cardano_price] DexHunter bestPool error %s for %s",
e.response.status_code,
asset_id[:20],
)
except Exception as e:
logger.warning("[cardano_price] DexHunter bestPool failed for %s: %s", asset_id[:20], e)
# --- Attempt 2: DexHunter community pair endpoint (fallback) ---
if price is None:
try:
url = f"https://api.dexhunter.io/community/pair/{asset_id}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url)
resp.raise_for_status()
data = resp.json()
raw_price = (
data.get("price_ada")
or data.get("priceAda")
or data.get("price")
)
if raw_price is not None:
price = float(raw_price)
logger.debug(
"[cardano_price] %s community pair price = %.8f ADA",
asset_id[:20],
price,
)
except httpx.HTTPStatusError as e:
if e.response.status_code not in (404, 422):
logger.warning(
"[cardano_price] DexHunter community error %s for %s",
e.response.status_code,
asset_id[:20],
)
except Exception as e:
logger.warning("[cardano_price] DexHunter community failed for %s: %s", asset_id[:20], e)
if price is not None and price > 0:
_cache_set(cache_key, price)
return price
logger.info("[cardano_price] No price found for %s (no liquidity or unsupported)", asset_id[:20])
return None
async def convert_usd_to_lovelace(usd_amount: float) -> int:
"""
Convert a USD amount to lovelace using the current ADA/USD price.
1 ADA = 1,000,000 lovelace.
Args:
usd_amount: Amount in USD (e.g. 49.99).
Returns:
Equivalent lovelace as an integer, or 0 if ADA price is unavailable.
Example:
>>> await convert_usd_to_lovelace(10.00)
# At ADA = $0.45 → 10 / 0.45 ADA → 22,222,222 lovelace
"""
if usd_amount <= 0:
return 0
ada_usd = await get_ada_usd_price()
if ada_usd <= 0:
logger.error("[cardano_price] Cannot convert USD to lovelace — ADA price unavailable")
return 0
ada_amount = usd_amount / ada_usd
lovelace = int(ada_amount * 1_000_000)
logger.debug(
"[cardano_price] $%.2f USD → %.6f ADA → %d lovelace (rate: $%.6f/ADA)",
usd_amount,
ada_amount,
lovelace,
ada_usd,
)
return lovelace
async def convert_token_to_lovelace(
policy_id: str,
asset_name_hex: str,
token_quantity: int,
token_decimals: int = 0,
) -> Optional[int]:
"""
Convert a raw token quantity to its equivalent lovelace value.
Uses the token's ADA price from DexHunter and accounts for decimal
precision so that, for example, 1,000,000 units of a 6-decimal token
equals 1.0 whole token.
Args:
policy_id: Token policy ID.
asset_name_hex: Token asset name as hex (e.g. "534e454b" for SNEK).
token_quantity: Raw on-chain token quantity (base units, not decimal-adjusted).
token_decimals: Number of decimal places for the token (default 0).
Returns:
Equivalent lovelace as an integer, or None if price is unavailable.
Example:
# NIGHT token at 0.001 ADA/NIGHT, 6 decimals
# quantity = 5_000_000 (= 5.0 NIGHT), price = 0.001 ADA/token
# → 5.0 * 0.001 ADA = 0.005 ADA = 5,000 lovelace
>>> await convert_token_to_lovelace(policy_id, asset_name_hex, 5_000_000, 6)
5000
"""
if token_quantity <= 0:
return 0
# ADA is always 1:1 with itself in lovelace terms
if not policy_id and not asset_name_hex:
return token_quantity # already in lovelace
token_ada_price = await get_token_ada_price(policy_id, asset_name_hex)
if token_ada_price is None:
logger.warning(
"[cardano_price] Cannot convert token to lovelace — no price for %s%s",
policy_id[:12],
asset_name_hex[:8],
)
return None
# Adjust for decimals: base_units / 10^decimals = whole tokens
whole_tokens = token_quantity / (10 ** token_decimals)
# Whole tokens × ADA per token × lovelace per ADA
lovelace = int(whole_tokens * token_ada_price * 1_000_000)
logger.debug(
"[cardano_price] %d base units (decimals=%d) → %.6f tokens × %.8f ADA → %d lovelace",
token_quantity,
token_decimals,
whole_tokens,
token_ada_price,
lovelace,
)
return lovelace

View file

@ -37,6 +37,7 @@ from cardano_checkout.monitor import (
DEFAULT_MAX_REPRICINGS,
DEFAULT_PAYMENT_WINDOW_MINUTES,
KOIOS_URL,
PriceFn,
check_pending_invoices,
reprice_expired_invoices,
)
@ -64,6 +65,7 @@ class InvoiceScheduler:
"""
store: InvoiceStore
price_fn: Optional[PriceFn] = None
koios_url: str = KOIOS_URL
check_interval_seconds: int = 15
reprice_interval_seconds: int = 60
@ -84,9 +86,15 @@ class InvoiceScheduler:
)
async def _job_reprice_expired(self) -> None:
if self.price_fn is None:
# No oracle wired — skip repricing silently. Consumers that
# don't care about the USD-lock workflow (e.g. fixed-ADA
# invoices) will never configure a price_fn; that's fine.
return
try:
await reprice_expired_invoices(
self.store,
price_fn=self.price_fn,
window_minutes=self.payment_window_minutes,
max_repricings=self.max_repricings,
limit=self.limit,

View file

@ -1,440 +0,0 @@
"""TradeCraft-specific compatibility shim.
TradeCraft's ``services/cardano_scheduler.py`` shipped five jobs, of which
only two ``check_pending_payments`` and ``reprice_expired_payments``
are generic invoice logic. The remaining three
(``_check_subscription_payments``, ``_reprice_subscription_payments``,
``_enforce_grace_period``) manipulate TradeCraft's ``Company``,
``Subscription``, and ``SubscriptionPayment`` SQLAlchemy models directly;
those are merchant-specific concerns that do not belong in the generic SDK.
This module preserves the original TradeCraft import surface so the
existing TradeCraft code path still works while the migration to
:class:`cardano_checkout.store.InvoiceStore` is in-flight. None of the
symbols here are meant to be used by new consumers.
**Do not depend on this module outside TradeCraft.** It is scheduled to
be removed once TradeCraft migrates fully to the Protocol-based API (see
TODO in the repo README).
All functions in here are *verbatim* lifts from the original
``services/cardano_scheduler.py`` TradeCraft's ``models`` + ``database``
modules are imported lazily so that importing this module never fails
for non-TradeCraft consumers. If TradeCraft's models are not importable
the jobs raise at call time, not at import time.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from cardano_checkout.monitor import check_address_utxos, evaluate_utxos
from cardano_checkout.oracles import convert_usd_to_lovelace, get_ada_usd_price
logger = logging.getLogger(__name__)
# Original TradeCraft free-function names — keep these stable.
_check_address_utxos = check_address_utxos
_evaluate_payment = evaluate_utxos
def _require_tradecraft_models():
"""Lazy import of the TradeCraft-specific models + session maker.
Returns ``(Company, Subscription, SubscriptionPayment, async_session_maker)``.
Raises ImportError with a clear message if TradeCraft isn't installed.
"""
try:
from database import async_session_maker # type: ignore[import-not-found]
from models import ( # type: ignore[import-not-found]
Company,
Subscription,
SubscriptionPayment,
)
except ImportError as exc: # pragma: no cover — only meaningful inside TradeCraft
raise ImportError(
"cardano_checkout.tradecraft_compat requires the TradeCraft app's "
"`models` + `database` modules to be importable. This shim is "
"only meant to be used from within TradeCraft itself."
) from exc
return Company, Subscription, SubscriptionPayment, async_session_maker
# ---------------------------------------------------------------------------
# Subscription payments job — verbatim TradeCraft logic
# ---------------------------------------------------------------------------
async def check_subscription_payments() -> None: # pragma: no cover — TradeCraft-only
"""Poll Koios for UTXOs at awaiting_payment subscription addresses.
On confirmation, advances ``Subscription.status`` to ``"active"`` and
updates ``Company.subscription_tier``. TradeCraft-specific.
"""
from sqlalchemy import select
Company, Subscription, SubscriptionPayment, async_session_maker = (
_require_tradecraft_models()
)
try:
async with async_session_maker() as db:
now = datetime.now(timezone.utc)
result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status.in_(["awaiting_payment", "underpaid"]),
SubscriptionPayment.expires_at > now,
)
)
payments = result.scalars().all()
if not payments:
return
logger.debug(
"[tradecraft-compat] Checking %d subscription payment(s)",
len(payments),
)
for sp in payments:
try:
utxos = await _check_address_utxos(sp.address)
expected = sp.expected_lovelace or 0
(
new_status_enum,
raw_lovelace,
total_value,
received_assets,
tx_hash,
) = await _evaluate_payment(expected, utxos)
new_status = new_status_enum.value
status_map = {
"pending": "awaiting_payment",
"confirmed": "confirmed",
"overpaid": "overpaid",
"underpaid": "underpaid",
}
mapped_status = status_map.get(new_status, new_status)
if mapped_status == sp.status and raw_lovelace == 0:
continue
sp.received_lovelace = raw_lovelace
sp.total_value_lovelace = total_value
sp.received_assets = received_assets
if tx_hash:
sp.tx_hash = tx_hash
if mapped_status != sp.status:
old_status = sp.status
sp.status = mapped_status
if mapped_status in ("confirmed", "overpaid"):
sp.confirmed_at = now
if sp.subscription_id:
sub_result = await db.execute(
select(Subscription).where(
Subscription.id == sp.subscription_id
)
)
sub = sub_result.scalar_one_or_none()
if sub:
sub.status = "active"
sub.updated_at = now
if (
sub.pending_tier
and sp.period_end
and sp.period_end <= now.date()
):
sub.tier = sub.pending_tier
sub.pending_tier = None
sub.pending_tier_at = None
company_result = await db.execute(
select(Company).where(Company.id == sp.company_id)
)
company = company_result.scalar_one_or_none()
if company:
sub_result2 = await db.execute(
select(Subscription).where(
Subscription.company_id == sp.company_id
)
)
sub2 = sub_result2.scalar_one_or_none()
if sub2:
company.subscription_tier = sub2.tier
company.subscription_status = "active"
logger.info(
"[tradecraft-compat] sub_payment #%d company_id=%d: "
"%s -> %s (%.6f ADA received)",
sp.id,
sp.company_id,
old_status,
mapped_status,
raw_lovelace / 1_000_000,
)
except Exception as e:
logger.exception(
"[tradecraft-compat] Error checking sub_payment #%d: %s",
sp.id,
e,
)
await db.commit()
except Exception:
logger.exception(
"[tradecraft-compat] check_subscription_payments job failed"
)
async def reprice_subscription_payments() -> None: # pragma: no cover — TradeCraft-only
"""Reprice expired subscription payments — 24h window, 3-reprice cap."""
from sqlalchemy import select
_, _, SubscriptionPayment, async_session_maker = _require_tradecraft_models()
try:
async with async_session_maker() as db:
now = datetime.now(timezone.utc)
result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status == "awaiting_payment",
SubscriptionPayment.expires_at <= now,
SubscriptionPayment.repriced_count < 3,
)
)
payments = result.scalars().all()
if not payments:
return
logger.info(
"[tradecraft-compat] Repricing %d subscription payment(s)",
len(payments),
)
ada_price = await get_ada_usd_price()
if ada_price <= 0:
logger.warning(
"[tradecraft-compat] Cannot reprice subscriptions — "
"ADA price unavailable"
)
return
new_expires_at = now + timedelta(hours=24)
for sp in payments:
try:
total_usd = float(sp.expected_usd or 0)
if total_usd <= 0:
sp.status = "expired"
continue
new_lovelace = await convert_usd_to_lovelace(total_usd)
if new_lovelace == 0:
continue
old_lovelace = sp.expected_lovelace
sp.expected_lovelace = new_lovelace
sp.ada_price_usd = Decimal(str(round(ada_price, 4)))
sp.expires_at = new_expires_at
sp.repriced_count += 1
logger.info(
"[tradecraft-compat] Repriced sub_payment #%d: %d -> %d "
"lovelace (ADA=$%.4f, reprice #%d)",
sp.id,
old_lovelace or 0,
new_lovelace,
ada_price,
sp.repriced_count,
)
except Exception as e:
logger.exception(
"[tradecraft-compat] Error repricing sub_payment #%d: %s",
sp.id,
e,
)
expired_result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status == "awaiting_payment",
SubscriptionPayment.expires_at <= now,
SubscriptionPayment.repriced_count >= 3,
)
)
for sp in expired_result.scalars().all():
sp.status = "expired"
logger.info(
"[tradecraft-compat] sub_payment #%d expired after %d repricings",
sp.id,
sp.repriced_count,
)
await db.commit()
except Exception:
logger.exception(
"[tradecraft-compat] reprice_subscription_payments job failed"
)
async def enforce_grace_period() -> None: # pragma: no cover — TradeCraft-only
"""Daily grace-period enforcement — past_due / suspended transitions."""
from sqlalchemy import select
Company, Subscription, SubscriptionPayment, async_session_maker = (
_require_tradecraft_models()
)
try:
async with async_session_maker() as db:
today = datetime.now(timezone.utc).date()
overdue_result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status.in_(
["awaiting_payment", "underpaid", "expired"]
),
SubscriptionPayment.due_date < today,
)
)
overdue_payments = overdue_result.scalars().all()
for sp in overdue_payments:
try:
sub_result = await db.execute(
select(Subscription).where(
Subscription.company_id == sp.company_id
)
)
sub = sub_result.scalar_one_or_none()
if not sub or sub.status in ("cancelled", "suspended"):
continue
company_result = await db.execute(
select(Company).where(Company.id == sp.company_id)
)
company = company_result.scalar_one_or_none()
if sp.grace_deadline and today > sp.grace_deadline:
if sub.status != "suspended":
sub.status = "suspended"
sub.updated_at = datetime.now(timezone.utc)
if company:
company.subscription_status = "suspended"
logger.info(
"[tradecraft-compat] company_id=%d suspended "
"(grace deadline %s passed)",
sp.company_id,
sp.grace_deadline,
)
elif sub.status == "active":
sub.status = "past_due"
sub.updated_at = datetime.now(timezone.utc)
if company:
company.subscription_status = "past_due"
logger.info(
"[tradecraft-compat] company_id=%d past_due "
"(due_date %s passed)",
sp.company_id,
sp.due_date,
)
except Exception as e:
logger.exception(
"[tradecraft-compat] Error enforcing grace period "
"for sub_payment #%d: %s",
sp.id,
e,
)
await db.commit()
except Exception:
logger.exception(
"[tradecraft-compat] enforce_grace_period job failed"
)
# ---------------------------------------------------------------------------
# Standalone TradeCraft scheduler — registers the subscription jobs only.
# ---------------------------------------------------------------------------
_tc_scheduler: Optional[AsyncIOScheduler] = None
async def start_tradecraft_scheduler() -> None: # pragma: no cover — TradeCraft-only
"""Start ONLY the TradeCraft-specific subscription + grace-period jobs.
The generic invoice jobs should be run via :class:`InvoiceScheduler`
against a ``SQLAlchemyInvoiceStore`` adapter (not shipped here
TradeCraft is responsible for implementing it during the migration).
"""
global _tc_scheduler
if _tc_scheduler and _tc_scheduler.running:
return
_tc_scheduler = AsyncIOScheduler()
_tc_scheduler.add_job(
check_subscription_payments,
trigger=IntervalTrigger(seconds=60),
id="tradecraft_check_sub_payments",
name="TradeCraft: Check Subscription Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_tc_scheduler.add_job(
reprice_subscription_payments,
trigger=IntervalTrigger(hours=6),
id="tradecraft_reprice_sub_payments",
name="TradeCraft: Reprice Subscription Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_tc_scheduler.add_job(
enforce_grace_period,
trigger=CronTrigger(hour=6, minute=0, timezone="UTC"),
id="tradecraft_enforce_grace",
name="TradeCraft: Enforce Subscription Grace Periods",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_tc_scheduler.start()
logger.info(
"[tradecraft-compat] Started — subscription + grace-period jobs only"
)
async def stop_tradecraft_scheduler() -> None: # pragma: no cover — TradeCraft-only
"""Stop the TradeCraft-specific scheduler."""
global _tc_scheduler
if _tc_scheduler:
_tc_scheduler.shutdown(wait=False)
_tc_scheduler = None

View file

@ -1,207 +0,0 @@
"""Transaction construction helpers wrapping PyCardano.
This module is the SDK's single point of contact with PyCardano's
:class:`pycardano.backend.base.ChainContext` API. Everything higher up
(``mint`` and eventual refund-path code) goes through the helpers
here so we can swap Ogmios for Blockfrost / Cardano-CLI without
touching callers.
The default context targets the local Ogmios instance on Rackham
(``127.0.0.1:1337``). That lines up with the mainnet deployment of the
``cardano-node`` container (v10.6.2 on port 6000 via N2N) fronted by
Ogmios as the HTTP+WS bridge. Preprod / testnet callers pass
``network="testnet"`` and typically point at a different host.
Cold-signer shape
-----------------
``txbuild`` only knows the hot-side half of the dance:
- :func:`make_ogmios_context` build a context from the live node.
- :func:`get_protocol_parameters` peek at the current protocol params
(useful for pricing, ttl calculations, etc.).
- :func:`get_address_utxos` list UTxOs at an address (refund path).
- :func:`submit_signed_tx` ship a tx that was signed offline.
Body construction lives in :mod:`cardano_checkout.mint` today. As
additional tx shapes (refunds, batched mints) arrive they'll land here
alongside ``build_*_tx`` helpers that return :class:`UnsignedMint`-style
cold-signer bundles.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, Optional
logger = logging.getLogger(__name__)
if TYPE_CHECKING: # pragma: no cover — hints only
from pycardano import ChainContext, UTxO
# ---------------------------------------------------------------------------
# Chain context
# ---------------------------------------------------------------------------
def _require_pycardano() -> None:
try:
import pycardano # noqa: F401
except ImportError as exc: # pragma: no cover — env sanity
raise RuntimeError(
"pycardano is required for transaction construction. "
"Add pycardano>=0.11.0 to requirements.txt and reinstall."
) from exc
def make_ogmios_context(
host: str = "127.0.0.1",
port: int = 1337,
network: str = "mainnet",
secure: bool = False,
**kwargs: Any,
) -> "ChainContext":
"""Construct an :class:`pycardano.OgmiosChainContext` for the live node.
Args:
host: Ogmios HTTP+WS host. Default ``127.0.0.1`` (local).
port: Ogmios port. Default ``1337`` (matches Rackham's stack).
network: ``"mainnet"`` or ``"testnet"``. Controls the
:class:`pycardano.Network` passed to the context.
secure: Whether to use wss:// instead of ws://. Default False
the stack assumes a loopback connection.
**kwargs: Forwarded to ``OgmiosChainContext`` verbatim (e.g.
``refetch_chain_tip_interval``, ``utxo_cache_size``).
Returns:
A live :class:`ChainContext`. If the backing node is down the
object is still constructed failures surface on the first
query / submit call.
"""
_require_pycardano()
from pycardano import Network, OgmiosChainContext
net = Network.MAINNET if network == "mainnet" else Network.TESTNET
logger.debug(
"[txbuild] OgmiosChainContext -> %s://%s:%d (network=%s)",
"wss" if secure else "ws",
host,
port,
network,
)
return OgmiosChainContext(
host=host, port=port, secure=secure, network=net, **kwargs
)
def get_protocol_parameters(context: "ChainContext") -> Any:
"""Return the live protocol parameters from the chain context.
Useful for fee estimation, min-utxo floor computation, and sanity
checks that the node is reachable before a mint attempt.
The return type is pycardano's :class:`ProtocolParameters` — a
dataclass with fields like ``min_fee_a``, ``min_fee_b``,
``coins_per_utxo_byte``, ``max_tx_size``, etc.
"""
try:
return context.protocol_param # type: ignore[attr-defined]
except Exception as exc:
raise RuntimeError(
f"Failed to fetch protocol parameters from chain context: {exc}"
) from exc
def get_address_utxos(context: "ChainContext", address: str) -> list["UTxO"]:
"""Fetch UTxOs at ``address`` via the chain context.
Intended for the refund path when an invoice is cancelled or
overpaid the merchant needs to know which UTxOs landed in order to
build a return tx. For pure payment-detection, Koios is still the
cheaper source (see :mod:`cardano_checkout.monitor`).
Args:
context: Live chain context (from :func:`make_ogmios_context`).
address: Bech32 Cardano address.
Returns:
List of pycardano :class:`UTxO` objects at ``address``. Empty if
the address has no unspent outputs. Never ``None``.
Raises:
RuntimeError: If the underlying query fails (node down, invalid address).
"""
_require_pycardano()
from pycardano import Address
try:
addr_obj = Address.from_primitive(address)
except Exception as exc:
raise RuntimeError(f"Invalid Cardano address: {exc}") from exc
try:
utxos = context.utxos(str(addr_obj)) # type: ignore[attr-defined]
except Exception as exc:
raise RuntimeError(
f"Failed to fetch UTxOs for {address[:20]}...: {exc}"
) from exc
return list(utxos or [])
# ---------------------------------------------------------------------------
# Signed-tx submission (duplicated from mint.py as a stable txbuild entry
# point — the mint module's version delegates here)
# ---------------------------------------------------------------------------
def submit_signed_tx(
signed_tx_cbor_hex: str,
context: Optional["ChainContext"] = None,
ogmios_host: str = "127.0.0.1",
ogmios_port: int = 1337,
network: str = "mainnet",
) -> str:
"""Submit a cold-signed transaction blob to the chain.
See :func:`cardano_checkout.mint.submit_signed_tx` for the full docstring
this is the same function under the ``txbuild`` import path so callers
that only need submission don't have to import ``mint``.
"""
_require_pycardano()
from pycardano import Transaction
if context is None:
context = make_ogmios_context(
host=ogmios_host, port=ogmios_port, network=network
)
try:
tx = Transaction.from_cbor(bytes.fromhex(signed_tx_cbor_hex))
except Exception as exc:
raise RuntimeError(
f"signed_tx_cbor_hex is not valid transaction CBOR: {exc}"
) from exc
try:
context.submit_tx(tx) # type: ignore[attr-defined]
except Exception as exc:
raise RuntimeError(f"Ogmios rejected the signed tx: {exc}") from exc
tx_hash = str(tx.id)
logger.info("[txbuild] submitted signed tx %s", tx_hash)
return tx_hash
# ---------------------------------------------------------------------------
# Placeholders for future tx shapes (kept so consumers can pin imports)
# ---------------------------------------------------------------------------
def build_payment_tx(*args, **kwargs): # pragma: no cover — future work
"""Build an unsigned plain-ADA payment tx (refund path). v0.3+."""
raise NotImplementedError(
"build_payment_tx lands in v0.3 alongside the refund workflow. "
"For v0.2 only mint txs are supported."
)