v0.1.0-dev: initial extraction from TradeCraft + new abstractions

Sulkta Coop's Python SDK for merchant-side Cardano payments +
NFT certificate-of-authenticity minting. Zero-custody by design.

Extracted from TradeCraft's services/cardano_*.py (2,400+ lines of
production Cardano-mainnet code) and restructured as an installable
Python package.

Package layout (cardano_checkout/):
- addresses.py   — lifted verbatim: CIP-1852 HD derivation, pure pycardano
- oracles.py     — lifted from cardano_price.py: Koios ADA/USD feed w/ 5m cache
- monitor.py     — lifted verbatim (SQLAlchemy-coupled; v0.2 refactors to Store)
- scheduler.py   — lifted verbatim (same refactor note)
- invoice.py     — NEW: framework-agnostic Invoice dataclass + lifecycle enum
- store.py       — NEW: InvoiceStore Protocol for pluggable persistence
- mint.py        — NEW: CIP-25 v2 metadata builder (works); tx submission stub for v0.2
- ipfs.py        — NEW: kubo HTTP client with primary-pin + mirror-pin pattern
- txbuild.py     — NEW: v0.2 stub for PyCardano / Ogmios tx construction

Design:
- Consumers provide xpub + InvoiceStore impl. SDK provides everything else.
- IPFS: local kubo for upload + serve, optional mirror pins for archival.
  Chromaticcraft pattern: Rackham kubo primary, Lucy kubo mirror.
- NFT: single native-script policy per merchant studio (CIP-25 v2, not CIP-68
  — full wallet coverage, no mutability needed for static certs). Policy skey
  stays under Sulkta cold-custody (Lucy pattern); signing is an external
  hand-off like ADAMaps payouts.

Tests: pure-module smoke tests pass for invoice, store-protocol, CIP-25
metadata envelope, IPFS client import, txbuild stub module. Address
derivation tests ship but require pycardano + will exercise in CI.

LICENSE: Apache-2.0 (matches upstream Cardano tooling).

Next (v0.2 scope):
- Refactor monitor + scheduler around InvoiceStore (drop SQLAlchemy coupling)
- Wire mint.mint_nft_cert to PyCardano + local Ogmios on Rackham
- txbuild: Ogmios chain-context + cold-signer hand-off shape
- chromaticcraft Phase 2 imports the SDK as its first external consumer
This commit is contained in:
Kayos 2026-04-23 18:04:00 -07:00
commit dc6378eda6
17 changed files with 2429 additions and 0 deletions

11
.gitignore vendored Normal file
View file

@ -0,0 +1,11 @@
__pycache__/
*.py[cod]
*.egg-info/
*.egg
.pytest_cache/
.mypy_cache/
.ruff_cache/
build/
dist/
.venv/
.env

201
LICENSE Normal file
View file

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for describing the origin of the Work and
reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Support. While redistributing the Work or
Derivative Works thereof, You may choose to offer, and charge a
fee for, acceptance of support, warranty, indemnity, or other
liability obligations and/or rights consistent with this License.
However, in accepting such obligations, You may act only on Your
own behalf and on Your sole responsibility, not on behalf of any
other Contributor, and only if You agree to indemnify, defend,
and hold each Contributor harmless for any liability incurred by,
or claims asserted against, such Contributor by reason of your
accepting any such warranty or support.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2026 Sulkta Coop
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License.

134
README.md Normal file
View file

@ -0,0 +1,134 @@
# cardano-checkout
Python SDK for merchant-side Cardano payments + NFT certificate-of-authenticity minting.
**Zero-custody by design:** the merchant provides a wallet xpub. The SDK derives
unique receive addresses per invoice, polls the chain for payment, and optionally
mints a CIP-25 NFT cert on confirmation. The platform never holds or moves funds.
Extracted from [TradeCraft](http://192.168.0.5:3001/TradeCraft/tradecraft)'s
`services/cardano_*.py` modules (2,400+ lines of production code running on the
Cardano mainnet) and packaged for reuse across the Sulkta Coop product family.
## Status
**v0.1.0-dev — alpha extraction.** Pure modules lifted verbatim from TradeCraft.
DB-coupled modules (monitor, scheduler) ship with a `TODO: refactor to Store
protocol` marker — they work as-is when paired with TradeCraft's SQLAlchemy models
but will be refactored to the generic `InvoiceStore` Protocol in v0.2.
| Module | Status | Notes |
|---|---|---|
| `addresses` | ✅ stable | CIP-1852 HD derivation; pure pycardano |
| `oracles` | ✅ stable | ADA/USD price via Koios with 5-min cache |
| `invoice` + `store` | ✅ new | Framework-agnostic invoice + persistence Protocol |
| `mint` | ⏳ stub | CIP-25 v2 metadata builder works; tx submission in v0.2 |
| `ipfs` | ✅ working | kubo HTTP API client w/ optional mirror-pin |
| `monitor` | 🟡 SQLAlchemy-coupled | v0.2 target: refactor around `InvoiceStore` |
| `scheduler` | 🟡 SQLAlchemy-coupled | v0.2 target: same |
| `txbuild` | ❌ v0.2 | Full PyCardano tx construction via Ogmios |
## Design
```
┌────────────────────────────────────────────────────────┐
│ Merchant App │
│ (TradeCraft / chromaticcraft / your-product) │
└──────────────┬───────────────────────┬─────────────────┘
│ │
uses │ implements │ imports
▼ ▼
┌──────────────┐ ┌────────────────────────┐
│ InvoiceStore │ ◄────── │ cardano_checkout SDK │
│ (your DB) │ │ │
└──────────────┘ │ addresses ← pure │
│ oracles ← pure │
│ invoice ← dataclass │
│ monitor ← polls chain │
│ scheduler ← bg loop │
│ mint ← NFT cert │
│ ipfs ← upload │
│ txbuild ← PyCardano wrappers │
└────────────────────────┘
talks to │
┌────────────────────────┐
│ Koios + Ogmios + kubo │
└────────────────────────┘
```
The merchant app provides:
1. A wallet xpub (account-level extended public key).
2. An `InvoiceStore` implementation (SQLAlchemy, Postgres, SQLite, in-memory — whatever).
The SDK provides:
1. Address derivation from the xpub.
2. Per-invoice payment monitoring against Koios.
3. ADA ↔ USD price conversion.
4. CIP-25 v2 NFT cert minting (v0.2).
5. IPFS upload + pinning for NFT image metadata.
## Quick start
```python
import asyncio
from cardano_checkout import addresses, oracles
# Derive a receive address for invoice #42
addr = addresses.derive_address(
xpub_hex="<your wallet xpub>",
index=42,
network="mainnet",
)
# Convert a USD price to lovelace at current market
async def main() -> None:
lovelace = await oracles.convert_usd_to_lovelace(99.00)
ada = lovelace / 1_000_000
print(f"Customer owes {ada:.4f} ADA for $99")
asyncio.run(main())
```
## IPFS: bake-then-mirror pattern
The SDK's `IPFSClient` expects a local kubo daemon (typically in the same
Docker image as the web app) for upload and primary pin, and takes an
optional list of mirror endpoints to `pin add` the CID on a second node
for archival redundancy.
Typical chromaticcraft deployment:
```python
from cardano_checkout import ipfs
client = ipfs.IPFSClient(
api_url="http://127.0.0.1:5001", # local kubo in the same container
mirror_api_urls=["http://192.168.254.5:5001"], # Lucy's kubo over the LAN/VPN
)
cid = await client.add(photo_bytes, filename="order-0001.jpg")
# Image now served by Rackham (low latency) AND pinned on Lucy (durability)
```
## NFT cert-of-authenticity design
One minting policy per merchant studio. Policy is a native script (no Plutus
required), optionally time-locked to make "no more editions after X" a
cryptographically verifiable claim.
CIP-25 v2 metadata. Single NFT per order. Policy skey never leaves the custody
host (Lucy in Sulkta's pattern). The SDK builds the metadata envelope + tx;
external signer does the signature.
## Installation
```
pip install 'cardano-checkout[sqlalchemy]' # if you're using SQLAlchemy
pip install cardano-checkout # core only
```
## License
Apache-2.0 — matches upstream Cardano tooling.

View file

@ -0,0 +1,48 @@
"""cardano_checkout — Python SDK for merchant-side Cardano payments + NFT cert minting.
Zero-custody by design: consumers provide a wallet xpub (account-level
extended public key). The SDK derives unique receive addresses per
invoice, polls the chain for payment, and (optionally) mints a CIP-25
NFT certificate of authenticity on confirmation.
Quick start::
from cardano_checkout import addresses, oracles
addr = addresses.derive_address(xpub_hex, index=42, network="mainnet")
price = await oracles.get_ada_usd_price()
lovelace = await oracles.convert_usd_to_lovelace(99.00)
For full invoice lifecycle see :mod:`cardano_checkout.invoice` +
:mod:`cardano_checkout.store` (Protocol-based persistence).
For NFT minting see :mod:`cardano_checkout.mint`.
"""
from __future__ import annotations
__version__ = "0.1.0-dev"
# Pure modules — stable API from extraction
from cardano_checkout import addresses, oracles # noqa: F401
# Payment lifecycle
from cardano_checkout.invoice import Invoice, InvoiceStatus # noqa: F401
from cardano_checkout.store import InvoiceStore # noqa: F401
# NFT + IPFS
from cardano_checkout.mint import MintPolicy, mint_nft_cert # noqa: F401
from cardano_checkout.ipfs import IPFSClient, pin_bytes # noqa: F401
__all__ = [
"__version__",
"addresses",
"oracles",
"Invoice",
"InvoiceStatus",
"InvoiceStore",
"MintPolicy",
"mint_nft_cert",
"IPFSClient",
"pin_bytes",
]

View file

@ -0,0 +1,218 @@
"""
Cardano HD address derivation service.
Derives Cardano base addresses from an account-level extended public key (xpub)
exported from wallets such as Eternl or Lace. Uses BIP-44 derivation via pycardano.
Key derivation path: m / 1852' / 1815' / account' / chain / index
- chain 0 = external (receive) addresses
- chain 2 = staking key (always index 0 for the account)
The xpub accepted here is the *account* public key the root has already been
hardened away by the wallet. We only perform soft derivation from account level
down, so no private key material is ever needed or touched.
"""
import logging
from typing import Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def derive_address(xpub_hex: str, index: int, network: str = "mainnet") -> str:
"""
Derive a Cardano base address at the given receive-address index.
The address is a Shelley-era base address combining:
- payment key: account_xpub / 0 (external chain) / index
- staking key: account_xpub / 2 (staking chain) / 0
Args:
xpub_hex: Hex-encoded account extended public key (64 bytes raw or
96 bytes with chain code, as exported by most CIP-1852 wallets).
index: Receive address index (0-based). Must be >= 0.
network: "mainnet" or "testnet" (preprod / preview). Defaults to mainnet.
Returns:
Bech32-encoded Cardano base address (addr1... or addr_test1...).
Raises:
ValueError: If xpub_hex is malformed, index is negative, or network is invalid.
RuntimeError: If pycardano is not installed or derivation fails unexpectedly.
"""
_require_pycardano()
if index < 0:
raise ValueError(f"Address index must be non-negative, got {index}")
net = _parse_network(network)
acct_pub = _parse_xpub(xpub_hex)
try:
# External receive chain (0) / address index
addr_pub = acct_pub.derive(0).derive(index)
# Staking chain (2) / always index 0 for the account
stake_pub = acct_pub.derive(2).derive(0)
except Exception as exc:
logger.exception("[cardano] Key derivation failed at index %d", index)
raise RuntimeError(f"Key derivation failed: {exc}") from exc
from pycardano import Address
address = Address(
payment_part=addr_pub.hash(),
staking_part=stake_pub.hash(),
network=net,
)
return str(address)
def validate_xpub(xpub_hex: str) -> bool:
"""
Validate that an xpub string is well-formed and parseable.
Checks:
- Is a non-empty string
- Is valid hex
- Is a valid pycardano HDPublicKey (correct byte length, valid point on curve)
Args:
xpub_hex: Hex-encoded account extended public key.
Returns:
True if the xpub is valid, False otherwise. Never raises.
"""
if not xpub_hex or not isinstance(xpub_hex, str):
return False
# Quick hex sanity before paying the crypto cost
stripped = xpub_hex.strip()
if not _is_hex(stripped):
return False
try:
_require_pycardano()
_parse_xpub(stripped)
return True
except Exception:
return False
def get_address_preview(xpub_hex: str, network: str = "mainnet") -> str:
"""
Derive the address at index 0 for settings UI preview.
Thin wrapper around derive_address exists so callers don't have to
know or care about the index convention.
Args:
xpub_hex: Hex-encoded account extended public key.
network: "mainnet" or "testnet". Defaults to mainnet.
Returns:
Bech32-encoded Cardano base address at index 0.
Raises:
ValueError: If xpub_hex is malformed or network is invalid.
RuntimeError: If derivation fails unexpectedly.
"""
return derive_address(xpub_hex, index=0, network=network)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _require_pycardano() -> None:
"""Raise a clear RuntimeError if pycardano is not installed."""
try:
import pycardano # noqa: F401
except ImportError as exc:
raise RuntimeError(
"pycardano is required for Cardano address derivation. "
"Add pycardano>=0.11.0 to requirements.txt and reinstall."
) from exc
def _parse_network(network: str):
"""
Parse a network string into a pycardano Network enum value.
Args:
network: "mainnet" or "testnet".
Returns:
pycardano.Network enum member.
Raises:
ValueError: If network is not one of the accepted values.
"""
from pycardano import Network
if network == "mainnet":
return Network.MAINNET
if network == "testnet":
return Network.TESTNET
raise ValueError(
f"Invalid network '{network}'. Expected 'mainnet' or 'testnet'."
)
def _parse_xpub(xpub_hex: str):
"""
Parse a hex-encoded extended public key into an HDPublicKey.
pycardano's HDPublicKey.from_primitive expects 64 raw bytes
(32-byte Ed25519 public key + 32-byte chain code). Some wallets
export 96 bytes; if so, we strip the trailing 32 bytes which are
the public key repeated.
Args:
xpub_hex: Hex-encoded extended public key string.
Returns:
pycardano.HDPublicKey instance.
Raises:
ValueError: If the byte length is unexpected or the key is invalid.
"""
from pycardano import HDPublicKey
try:
raw = bytes.fromhex(xpub_hex.strip())
except ValueError as exc:
raise ValueError(f"xpub_hex is not valid hex: {exc}") from exc
# Standard CIP-1852 account xpub is 64 bytes (pubkey || chain_code).
# Some export formats prepend 32 zeroed or duplicated bytes — handle both.
if len(raw) == 64:
pass # Expected format
elif len(raw) == 96:
# Strip the first 32 bytes (typically a duplicate or empty prefix)
raw = raw[32:]
else:
raise ValueError(
f"Unexpected xpub length: {len(raw)} bytes. "
"Expected 64 bytes (pubkey + chain_code)."
)
try:
return HDPublicKey.from_primitive(raw)
except Exception as exc:
raise ValueError(f"xpub is not a valid extended public key: {exc}") from exc
def _is_hex(value: str) -> bool:
"""Return True if every character in value is a valid hex digit."""
if not value:
return False
try:
bytes.fromhex(value)
return True
except ValueError:
return False

102
cardano_checkout/invoice.py Normal file
View file

@ -0,0 +1,102 @@
"""Invoice state machine for Cardano-native merchant payments.
An Invoice represents one payment intent: a unique receive address
derived from the merchant's xpub, an expected amount in lovelace, a
USD-denominated label, and a lifecycle state that transitions as the
chain confirms payment.
The Invoice is deliberately framework-agnostic persistence is
delegated to an :class:`InvoiceStore` (see :mod:`cardano_checkout.store`).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
class InvoiceStatus(str, Enum):
"""Lifecycle states for a Cardano checkout invoice.
Valid transitions::
PENDING MATCHED CONFIRMED
UNDERPAID
OVERPAID (still moves to CONFIRMED but flagged)
EXPIRED (no payment within window)
CANCELLED (merchant-initiated)
CONFIRMED is terminal success; EXPIRED / CANCELLED are terminal failures.
UNDERPAID is recoverable if the customer sends the delta.
"""
PENDING = "pending"
MATCHED = "matched" # at least one UTxO landed, not yet k-confirmed
CONFIRMED = "confirmed" # enough confirmations, payment final
UNDERPAID = "underpaid" # delta < expected by more than tolerance
OVERPAID = "overpaid" # delta > expected by more than tolerance (non-fatal)
EXPIRED = "expired"
CANCELLED = "cancelled"
@dataclass
class Invoice:
"""One Cardano payment intent.
Attributes:
id: Stable per-merchant identifier (UUID or monotonic; caller's choice).
merchant_id: Opaque merchant namespace the SDK never interprets it.
derivation_index: BIP-44 receive-chain index used to derive receive_address.
receive_address: Bech32 address the customer pays to. Derived from the
merchant's xpub at ``derivation_index``.
expected_lovelace: Target amount. Set at creation time from the USD ADA
oracle snapshot; does NOT float with market price once set.
usd_amount: Human-readable label for what the customer owes.
status: Current lifecycle state. See :class:`InvoiceStatus`.
created_at: UTC timestamp when the invoice was created.
expires_at: UTC timestamp when this invoice stops accepting payment.
Typically created_at + 15 minutes for live-price quotes.
tx_hashes: All observed inbound tx hashes. Empty until first UTxO lands.
received_lovelace: Sum of inbound UTxOs at ``receive_address``.
confirmed_at: UTC timestamp when status became CONFIRMED.
metadata: Arbitrary merchant-defined payload (order id, sku, etc.).
"""
id: str
merchant_id: str
derivation_index: int
receive_address: str
expected_lovelace: int
usd_amount: float
status: InvoiceStatus = InvoiceStatus.PENDING
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
expires_at: Optional[datetime] = None
tx_hashes: list[str] = field(default_factory=list)
received_lovelace: int = 0
confirmed_at: Optional[datetime] = None
metadata: dict = field(default_factory=dict)
@property
def ada_amount(self) -> float:
"""Expected amount in ADA (lovelace / 1_000_000)."""
return self.expected_lovelace / 1_000_000
@property
def is_terminal(self) -> bool:
"""True if the invoice has reached a state it can't leave."""
return self.status in {
InvoiceStatus.CONFIRMED,
InvoiceStatus.EXPIRED,
InvoiceStatus.CANCELLED,
}
def is_expired(self, now: Optional[datetime] = None) -> bool:
"""True if wall-clock is past expires_at and status is still PENDING."""
if self.expires_at is None:
return False
current = now or datetime.now(timezone.utc)
return current >= self.expires_at and self.status == InvoiceStatus.PENDING

107
cardano_checkout/ipfs.py Normal file
View file

@ -0,0 +1,107 @@
"""Minimal IPFS client — upload + pin via kubo's HTTP API.
Designed for the ``chromaticcraft`` shape: a small local kubo daemon
runs alongside the web app, accepts uploads from end users (e.g. Abby
uploading a photo of a finished custom order), pins locally for fast
serving, and optionally mirrors pins to a second remote node
(Lucy-on-LAN) for archival redundancy.
No IPFS libraries are imported just httpx against the kubo REST API
(v0). Keeps the SDK surface minimal.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
@dataclass
class IPFSClient:
"""Kubo-compatible IPFS client.
Attributes:
api_url: Base URL of the kubo HTTP API (default ``http://127.0.0.1:5001``).
timeout: Per-request timeout in seconds (default 60 uploads can be slow).
mirror_api_urls: Optional list of additional kubo endpoints to
``pin add`` the CID on after a successful primary pin. Use this
to mirror to Lucy or any other archival node.
"""
api_url: str = "http://127.0.0.1:5001"
timeout: float = 60.0
mirror_api_urls: list[str] = None # type: ignore[assignment]
def __post_init__(self) -> None:
if self.mirror_api_urls is None:
self.mirror_api_urls = []
async def add(self, data: bytes, filename: str = "upload") -> str:
"""Upload bytes and pin them locally.
Args:
data: Raw bytes to add.
filename: Logical name used by clients browsing the DAG
(doesn't affect the CID).
Returns:
CID (base58, v0 or base32 v1 depending on kubo defaults).
Raises:
RuntimeError: If the daemon is unreachable or returns a non-2xx.
"""
url = f"{self.api_url.rstrip('/')}/api/v0/add"
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(
url,
files={"file": (filename, data, "application/octet-stream")},
params={"pin": "true", "cid-version": "1"},
)
if resp.status_code >= 400:
raise RuntimeError(f"ipfs add {resp.status_code}: {resp.text[:200]}")
# kubo's /add streams NDJSON; each line is one {Name, Hash, Size}.
# For a single file upload the last line carries the wrapping CID.
last_cid: Optional[str] = None
for line in resp.text.strip().splitlines():
if '"Hash"' in line:
import json
obj = json.loads(line)
last_cid = obj.get("Hash")
if not last_cid:
raise RuntimeError(f"ipfs add: no CID in response: {resp.text[:200]}")
# Mirror pins (best effort — a mirror failure should not poison the primary upload).
for mirror in self.mirror_api_urls:
try:
await self._pin_on(mirror, last_cid)
except Exception as exc:
logger.warning("[ipfs] mirror pin to %s failed for %s: %s", mirror, last_cid, exc)
return last_cid
async def _pin_on(self, api_url: str, cid: str) -> None:
"""Pin an existing CID on a remote kubo node."""
url = f"{api_url.rstrip('/')}/api/v0/pin/add"
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(url, params={"arg": cid})
if resp.status_code >= 400:
raise RuntimeError(f"pin/add {resp.status_code}: {resp.text[:200]}")
async def pin_bytes(
data: bytes,
api_url: str = "http://127.0.0.1:5001",
mirror_api_urls: Optional[list[str]] = None,
filename: str = "upload",
) -> str:
"""Convenience wrapper: one-shot upload + pin (+ optional mirror).
Returns the CID.
"""
client = IPFSClient(api_url=api_url, mirror_api_urls=mirror_api_urls or [])
return await client.add(data, filename=filename)

156
cardano_checkout/mint.py Normal file
View file

@ -0,0 +1,156 @@
"""CIP-25 v2 NFT certificate-of-authenticity minting.
This module produces the NFT cert attached to a confirmed merchant
order. One NFT per order, pinned-once metadata (image CID from IPFS
via :mod:`cardano_checkout.ipfs`), sent directly to the customer's
wallet in the same transaction.
Design decisions:
- **CIP-25 v2** (not CIP-68). CIP-25 is universally supported by
every Cardano wallet (Eternl, Lace, Yoroi, Vespr, Typhon). CIP-68
adds reference-NFT mutability we do not need for a static cert.
- **Single policy per merchant studio.** All of a studio's certs share
one policy_id so wallets group them cleanly. The policy key is a
native script under the studio's custody — Sulkta pattern is a
multi-sig native script stored on Lucy.
- **Policy has a time-lock** (invalid-after slot) so the "no more
editions can be minted after X" claim is cryptographically enforceable.
Recommended: generous lock (100 years) so policy_id stays stable,
but revokable in-contract via ``mint policy revoke`` flow.
- **No reference script, no Plutus.** Pure native scripts + standard
CIP-25 metadata keeps the tx cheap (~0.18 ADA fee + min-utxo for the
NFT output).
v0.1.0 ships the signature + stub. Full tx construction against a local
Ogmios endpoint lands in v0.2 once the store + monitor integration
tests pass.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class MintPolicy:
"""A native-script minting policy under the SDK's custody model.
Attributes:
policy_id: Hex blake2b-224 hash of the native script CBOR. Stable
for the life of the policy shipped with every cert minted
under it. Becomes the Cardano ``policy_id`` of the NFT asset.
script_cbor_hex: Hex-encoded CBOR of the native script itself.
Submitted alongside the mint tx witness.
signing_keys: Paths to skey files needed to sign the tx (all of
them, for an all-of multi-sig). The SDK does not read these
callers pass them to :mod:`cardano_checkout.txbuild` which
coordinates with an external signer (Lucy cold-store pattern).
locked_after_slot: Optional slot beyond which the policy rejects
further mints. None = no time lock (not recommended for
certificates a lock makes the "no more editions" claim
mathematically verifiable).
"""
policy_id: str
script_cbor_hex: str
signing_keys: list[str] = field(default_factory=list)
locked_after_slot: Optional[int] = None
async def mint_nft_cert(
policy: MintPolicy,
asset_name: str,
metadata: dict,
recipient_address: str,
ogmios_url: str = "http://127.0.0.1:1337",
network: str = "mainnet",
) -> str:
"""Mint a CIP-25 v2 NFT cert and send it to the recipient.
Constructs a transaction that:
1. Mints exactly 1 of ``{policy.policy_id}.{asset_name}``.
2. Sends that single token to ``recipient_address`` in its own UTxO.
3. Attaches CIP-25 v2 metadata under metadatum label 721.
4. Witnesses with all signing keys required by the policy.
Args:
policy: Merchant's minting policy.
asset_name: UTF-8 asset name (will be hex-encoded per CIP-25). Max 32 bytes.
metadata: CIP-25 metadata dict. At minimum should include ``name``,
``image`` (ipfs://CID), ``mediaType``, and any studio-specific
properties. The SDK wraps this into the proper ``{721: {policy_id: {asset_name: ...}}}``
envelope automatically.
recipient_address: Bech32 address of the wallet that receives the NFT.
ogmios_url: Endpoint for chain queries + tx submission.
network: "mainnet" or "testnet".
Returns:
Transaction hash (hex) once successfully submitted.
Raises:
NotImplementedError: v0.1.0 stub. Full implementation lands in v0.2.
"""
# v0.1.0 surface lock — implementation lands in v0.2 alongside txbuild.py
raise NotImplementedError(
"mint_nft_cert is stubbed in v0.1.0. Use cardano-cli or PyCardano "
"directly until v0.2 ships the full Ogmios-backed mint path."
)
def build_cip25_metadata(
policy_id: str,
asset_name: str,
name: str,
image_cid: str,
description: str = "",
media_type: str = "image/jpeg",
properties: Optional[dict] = None,
) -> dict:
"""Assemble the ``{721: {...}}`` metadatum envelope for a single NFT.
CIP-25 v2 image field takes an ``ipfs://<CID>`` URI. Description, if
longer than 64 characters, is split into an array of 64-char chunks
(CIP-25 constraint from the Cardano metadata schema strings larger
than 64 chars are encoded as a list of chunks).
Args:
policy_id: Hex policy id (same as on the asset).
asset_name: UTF-8 asset name used as the dict key under policy_id.
name: Human-readable NFT title (shown in wallets).
image_cid: IPFS CID the function prepends ``ipfs://``.
description: Optional longer text. Will be chunked if > 64 chars.
media_type: MIME type of the image. Default ``image/jpeg``.
properties: Additional key/value pairs merged into the metadata blob.
Returns:
Dict ready to submit as tx metadatum label 721.
"""
def chunk64(s: str) -> list[str]:
if len(s) <= 64:
return [s]
return [s[i : i + 64] for i in range(0, len(s), 64)]
desc: object = description
if isinstance(description, str) and len(description) > 64:
desc = chunk64(description)
body: dict = {
"name": name,
"image": f"ipfs://{image_cid}",
"mediaType": media_type,
}
if desc:
body["description"] = desc
if properties:
body.update(properties)
return {
"721": {
policy_id: {
asset_name: body,
},
"version": "2.0",
}
}

317
cardano_checkout/monitor.py Normal file
View file

@ -0,0 +1,317 @@
"""
Cardano UTXO Monitoring Service
Polls Koios API to detect on-chain payments at derived Cardano addresses.
Called by the scheduler every 15 seconds for pending payments, and every
60 seconds to reprice expired payment requests.
Koios endpoint used:
POST https://api.koios.rest/api/v1/address_utxos
Body: {"_addresses": ["addr1..."]}
Status flow applied here:
pending -> confirmed (received >= expected * 0.98)
pending -> underpaid (received > 0 but < expected * 0.98)
pending -> overpaid (received >= expected * 1.02 still confirmed)
pending -> expired (handled by reprice_expired_payments)
"""
import logging
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Optional
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models import CardanoPayment, Config, PlatformConfig
from services.cardano_price import (
convert_token_to_lovelace,
get_ada_usd_price,
convert_usd_to_lovelace,
KNOWN_TOKENS,
)
logger = logging.getLogger(__name__)
KOIOS_URL = "https://api.koios.rest/api/v1/address_utxos"
KOIOS_TIMEOUT = 15 # seconds
# Tolerance for confirming payment (2%)
CONFIRM_TOLERANCE = 0.98
OVERPAY_THRESHOLD = 1.02
# =============================================================================
# Koios API
# =============================================================================
async def _check_address_utxos(address: str) -> list[dict]:
"""
Query Koios for all UTXOs at the given Cardano address.
Returns a list of UTXO dicts from Koios, or an empty list on error.
Each UTXO has keys: tx_hash, tx_index, value (lovelace), asset_list.
"""
try:
async with httpx.AsyncClient(timeout=KOIOS_TIMEOUT) as client:
resp = await client.post(
KOIOS_URL,
json={"_addresses": [address]},
headers={"Accept": "application/json"},
)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, list):
logger.warning("[cardano-monitor] Unexpected Koios response shape for %s", address[:20])
return []
return data
except httpx.HTTPStatusError as e:
logger.error(
"[cardano-monitor] Koios HTTP %s for %s: %s",
e.response.status_code, address[:20], e.response.text[:200],
)
return []
except httpx.TimeoutException:
logger.warning("[cardano-monitor] Koios timeout for address %s", address[:20])
return []
except Exception as e:
logger.error("[cardano-monitor] Koios unexpected error for %s: %s", address[:20], e)
return []
# =============================================================================
# Payment evaluation
# =============================================================================
async def _evaluate_payment(payment: CardanoPayment, utxos: list[dict]) -> tuple[str, int, int, dict, Optional[str]]:
"""
Evaluate UTXOs against the expected payment and determine new status.
Returns:
(new_status, received_lovelace, total_value_lovelace, received_assets, tx_hash)
Status rules:
- No UTXOs -> "pending" (no change)
- total_value >= expected * OVERPAY_THRESHOLD -> "overpaid" (treated as confirmed)
- total_value >= expected * CONFIRM_TOLERANCE -> "confirmed"
- total_value > 0 but below tolerance -> "underpaid"
"""
if not utxos:
return "pending", 0, 0, {}, None
raw_lovelace = 0
received_assets: dict[str, int] = {}
latest_tx_hash: Optional[str] = None
for utxo in utxos:
# Sum ADA (lovelace)
try:
raw_lovelace += int(utxo.get("value", 0))
except (ValueError, TypeError):
pass
# Track latest tx_hash
tx = utxo.get("tx_hash")
if tx:
latest_tx_hash = tx
# Collect native assets
for asset in utxo.get("asset_list", []) or []:
policy_id = asset.get("policy_id", "")
asset_name = asset.get("asset_name", "")
asset_id = f"{policy_id}.{asset_name}"
try:
qty = int(asset.get("quantity", 0))
except (ValueError, TypeError):
qty = 0
if qty > 0:
received_assets[asset_id] = received_assets.get(asset_id, 0) + qty
# Convert native assets to lovelace equivalent
asset_lovelace = 0
for asset_id, qty in received_assets.items():
if "." not in asset_id:
continue
policy_id, asset_name_hex = asset_id.split(".", 1)
# Find matching known token for decimals
decimals = 0
for token_info in KNOWN_TOKENS.values():
if token_info.get("policy_id") == policy_id:
decimals = token_info.get("decimals", 0)
break
try:
lv = await convert_token_to_lovelace(policy_id, asset_name_hex, qty, decimals)
if lv is not None:
asset_lovelace += lv
except Exception as e:
logger.warning("[cardano-monitor] Failed to convert asset %s to lovelace: %s", asset_id[:20], e)
total_value = raw_lovelace + asset_lovelace
expected = payment.expected_lovelace or 0
if expected == 0:
# Degenerate case — treat any payment as confirmed
new_status = "confirmed"
elif total_value >= expected * OVERPAY_THRESHOLD:
new_status = "overpaid"
elif total_value >= expected * CONFIRM_TOLERANCE:
new_status = "confirmed"
elif total_value > 0:
new_status = "underpaid"
else:
new_status = "pending"
return new_status, raw_lovelace, total_value, received_assets, latest_tx_hash
# =============================================================================
# Main monitoring functions (called by scheduler)
# =============================================================================
async def check_pending_payments(db: AsyncSession) -> None:
"""
Check all pending payments that haven't expired yet.
Queries Koios for UTXOs at each address. Updates payment status in place.
"""
now = datetime.now(timezone.utc)
result = await db.execute(
select(CardanoPayment).where(
CardanoPayment.status == "pending",
CardanoPayment.expires_at > now,
)
)
payments = result.scalars().all()
if not payments:
return
logger.debug("[cardano-monitor] Checking %d pending payment(s)", len(payments))
for payment in payments:
try:
utxos = await _check_address_utxos(payment.address)
new_status, raw_lovelace, total_value, received_assets, tx_hash = await _evaluate_payment(payment, utxos)
if new_status == payment.status and raw_lovelace == 0:
# No change, no UTXOs — skip DB write
continue
payment.received_lovelace = raw_lovelace
payment.total_value_lovelace = total_value
payment.received_assets = received_assets
if tx_hash:
payment.tx_hash = tx_hash
if new_status != payment.status:
old_status = payment.status
payment.status = new_status
if new_status in ("confirmed", "overpaid"):
payment.confirmed_at = now
logger.info(
"[cardano-monitor] payment #%d invoice_id=%d: %s -> %s (%.6f ADA received, %.6f ADA total value)",
payment.id,
payment.invoice_id or 0,
old_status,
new_status,
raw_lovelace / 1_000_000,
total_value / 1_000_000,
)
except Exception as e:
logger.exception(
"[cardano-monitor] Error checking payment #%d: %s", payment.id, e
)
await db.commit()
async def reprice_expired_payments(db: AsyncSession) -> None:
"""
Reprice payments whose window has expired.
Fetches the current ADA price, recalculates expected_lovelace, resets
expires_at to now + payment_window_minutes, and increments repriced_count.
Gives up after 3 repricings to avoid infinite loops.
"""
now = datetime.now(timezone.utc)
result = await db.execute(
select(CardanoPayment).where(
CardanoPayment.status == "pending",
CardanoPayment.expires_at <= now,
CardanoPayment.repriced_count < 3,
)
)
payments = result.scalars().all()
if not payments:
return
logger.info("[cardano-monitor] Repricing %d expired payment(s)", len(payments))
ada_price = await get_ada_usd_price()
if ada_price <= 0:
logger.warning("[cardano-monitor] Cannot reprice — ADA price unavailable")
return
# Read platform payment window
pc_result = await db.execute(
select(PlatformConfig).where(PlatformConfig.key == "cardano_payment_window_minutes")
)
pc = pc_result.scalar_one_or_none()
try:
window_minutes = int(pc.value) if pc and pc.value else 15
except (ValueError, TypeError):
window_minutes = 15
new_expires_at = now + timedelta(minutes=window_minutes)
for payment in payments:
try:
total_usd = float(payment.expected_usd or 0)
if total_usd <= 0:
payment.status = "expired"
logger.warning("[cardano-monitor] payment #%d has no expected_usd — marking expired", payment.id)
continue
new_lovelace = await convert_usd_to_lovelace(total_usd)
if new_lovelace == 0:
logger.warning("[cardano-monitor] payment #%d: lovelace conversion returned 0, skipping", payment.id)
continue
old_lovelace = payment.expected_lovelace
payment.expected_lovelace = new_lovelace
payment.ada_price_usd = Decimal(str(round(ada_price, 4)))
payment.expires_at = new_expires_at
payment.repriced_count += 1
logger.info(
"[cardano-monitor] Repriced payment #%d: %d -> %d lovelace (ADA=$%.4f, reprice #%d)",
payment.id, old_lovelace or 0, new_lovelace, ada_price, payment.repriced_count,
)
except Exception as e:
logger.exception("[cardano-monitor] Error repricing payment #%d: %s", payment.id, e)
# Mark payments that have exceeded max repricings as expired
expired_result = await db.execute(
select(CardanoPayment).where(
CardanoPayment.status == "pending",
CardanoPayment.expires_at <= now,
CardanoPayment.repriced_count >= 3,
)
)
for payment in expired_result.scalars().all():
payment.status = "expired"
logger.info("[cardano-monitor] payment #%d marked expired after %d repricings", payment.id, payment.repriced_count)
await db.commit()

346
cardano_checkout/oracles.py Normal file
View file

@ -0,0 +1,346 @@
"""
Cardano Token Price Service Phase 2 of the Cardano payments system.
Provides cached ADA/USD and token/ADA price lookups used to convert
invoice amounts into lovelace (ADA's base unit) for payment requests.
Data sources:
- ADA/USD: CoinGecko free API (no key required, rate-limited)
- Token/ADA: DexHunter v2 API (DEX aggregator on Cardano)
Cache strategy: module-level dict with timestamps. TTL = 5 minutes.
All functions are async, never raise return None/0 on failure.
"""
import logging
import time
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Token registry
# ---------------------------------------------------------------------------
KNOWN_TOKENS: dict[str, dict] = {
"ada": {
"policy_id": "",
"asset_name": "",
"ticker": "ADA",
"decimals": 6,
"type": "native",
},
"djed": {
"policy_id": "8db269c3ec630e06ae29f74bc39edd1f87c819f1056206e879a1cd61",
"asset_name": "444a4544", # "DJED".encode().hex()
"ticker": "DJED",
"decimals": 6,
"type": "stablecoin",
},
"iusd": {
"policy_id": "f66d78b4a3cb3d37afa0ec36461e51ecbde00f26c8f0a68f94b69880",
"asset_name": "69555344", # "iUSD".encode().hex()
"ticker": "iUSD",
"decimals": 6,
"type": "stablecoin",
},
"night": {
"policy_id": "0691b2fecca1ac4f53cb6dfb00b7013e561d1f34403b957cbb5af1fa",
"asset_name": "4e49474854", # "NIGHT".encode().hex()
"ticker": "NIGHT",
"decimals": 6,
"type": "utility",
},
"snek": {
"policy_id": "279c909f348e533da5808898f87f9a14bb2c3dfbbacccd631d927a3f",
"asset_name": "534e454b", # "SNEK".encode().hex()
"ticker": "SNEK",
"decimals": 0,
"type": "meme",
},
"iag": {
"policy_id": "5d16944c1e00a5fa1d14ba2460709bc2e41a18e8e1b86a1e7a09da09",
"asset_name": "494147", # "IAG".encode().hex()
"ticker": "IAG",
"decimals": 6,
"type": "utility",
},
}
# ---------------------------------------------------------------------------
# Internal cache — { key: (value, fetched_at_unix) }
# ---------------------------------------------------------------------------
_CACHE: dict[str, tuple] = {}
_CACHE_TTL_SECONDS = 300 # 5 minutes
def _cache_get(key: str) -> Optional[float]:
"""Return cached value if still fresh, else None."""
entry = _CACHE.get(key)
if entry is None:
return None
value, fetched_at = entry
if time.monotonic() - fetched_at > _CACHE_TTL_SECONDS:
return None
return value
def _cache_set(key: str, value: float) -> None:
"""Store value in cache with current timestamp."""
_CACHE[key] = (value, time.monotonic())
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def get_ada_usd_price() -> float:
"""
Fetch the current ADA/USD price from CoinGecko.
Caches result for 5 minutes. Returns 0.0 on failure callers should
treat 0.0 as a signal that pricing is unavailable.
Endpoint: GET https://api.coingecko.com/api/v3/simple/price
"""
cache_key = "ada_usd"
cached = _cache_get(cache_key)
if cached is not None:
return cached
url = "https://api.coingecko.com/api/v3/simple/price"
params = {"ids": "cardano", "vs_currencies": "usd"}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
price = float(data["cardano"]["usd"])
except httpx.HTTPStatusError as e:
logger.error(
"[cardano_price] CoinGecko request failed: %s %s",
e.response.status_code,
e.response.text[:200],
)
return 0.0
except (KeyError, ValueError, TypeError) as e:
logger.error("[cardano_price] CoinGecko response parse error: %s", e)
return 0.0
except Exception as e:
logger.error("[cardano_price] CoinGecko unexpected error: %s", e)
return 0.0
logger.debug("[cardano_price] ADA/USD = %.6f (live)", price)
_cache_set(cache_key, price)
return price
async def get_token_ada_price(policy_id: str, asset_name_hex: str) -> Optional[float]:
"""
Fetch the price of a Cardano native token in ADA from DexHunter.
Tries the DexHunter v2 bestPool endpoint first, then falls back to the
community pair endpoint. Both return the token's ADA price per base unit.
Args:
policy_id: The token's Cardano policy ID (hex string).
asset_name_hex: The token's asset name as a hex-encoded string.
Derive with: token_ticker.encode().hex()
Returns:
Price in ADA per base unit of the token, or None if no liquidity /
not found / request failed.
Cache: 5 minutes per (policy_id, asset_name_hex) pair.
"""
if not policy_id or asset_name_hex is None:
# ADA itself — price is 1 ADA by definition
return 1.0
asset_id = f"{policy_id}{asset_name_hex}"
cache_key = f"token_ada:{asset_id}"
cached = _cache_get(cache_key)
if cached is not None:
return cached
price: Optional[float] = None
# --- Attempt 1: DexHunter v2 bestPool ---
try:
url = "https://api-v2.dexhunter.io/swap/bestPool"
params = {"tokenA": "lovelace", "tokenB": asset_id}
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
# DexHunter returns price_a_per_b or price_b_per_a depending on direction.
# We want ADA per token — look for the field that represents that.
raw_price = (
data.get("price_b_per_a") # token per lovelace inverse
or data.get("price_a_per_b") # ada per token
or data.get("price")
)
if raw_price is not None:
candidate = float(raw_price)
# bestPool returns lovelace-denominated prices — convert to ADA
# If the value is very large (>1000), it's likely lovelace/token, invert & divide
if candidate > 1000:
price = 1_000_000 / candidate # lovelace per token → ADA per token
else:
price = candidate
logger.debug("[cardano_price] %s bestPool price = %.8f ADA", asset_id[:20], price)
except httpx.HTTPStatusError as e:
if e.response.status_code not in (404, 422):
logger.warning(
"[cardano_price] DexHunter bestPool error %s for %s",
e.response.status_code,
asset_id[:20],
)
except Exception as e:
logger.warning("[cardano_price] DexHunter bestPool failed for %s: %s", asset_id[:20], e)
# --- Attempt 2: DexHunter community pair endpoint (fallback) ---
if price is None:
try:
url = f"https://api.dexhunter.io/community/pair/{asset_id}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url)
resp.raise_for_status()
data = resp.json()
raw_price = (
data.get("price_ada")
or data.get("priceAda")
or data.get("price")
)
if raw_price is not None:
price = float(raw_price)
logger.debug(
"[cardano_price] %s community pair price = %.8f ADA",
asset_id[:20],
price,
)
except httpx.HTTPStatusError as e:
if e.response.status_code not in (404, 422):
logger.warning(
"[cardano_price] DexHunter community error %s for %s",
e.response.status_code,
asset_id[:20],
)
except Exception as e:
logger.warning("[cardano_price] DexHunter community failed for %s: %s", asset_id[:20], e)
if price is not None and price > 0:
_cache_set(cache_key, price)
return price
logger.info("[cardano_price] No price found for %s (no liquidity or unsupported)", asset_id[:20])
return None
async def convert_usd_to_lovelace(usd_amount: float) -> int:
"""
Convert a USD amount to lovelace using the current ADA/USD price.
1 ADA = 1,000,000 lovelace.
Args:
usd_amount: Amount in USD (e.g. 49.99).
Returns:
Equivalent lovelace as an integer, or 0 if ADA price is unavailable.
Example:
>>> await convert_usd_to_lovelace(10.00)
# At ADA = $0.45 → 10 / 0.45 ADA → 22,222,222 lovelace
"""
if usd_amount <= 0:
return 0
ada_usd = await get_ada_usd_price()
if ada_usd <= 0:
logger.error("[cardano_price] Cannot convert USD to lovelace — ADA price unavailable")
return 0
ada_amount = usd_amount / ada_usd
lovelace = int(ada_amount * 1_000_000)
logger.debug(
"[cardano_price] $%.2f USD → %.6f ADA → %d lovelace (rate: $%.6f/ADA)",
usd_amount,
ada_amount,
lovelace,
ada_usd,
)
return lovelace
async def convert_token_to_lovelace(
policy_id: str,
asset_name_hex: str,
token_quantity: int,
token_decimals: int = 0,
) -> Optional[int]:
"""
Convert a raw token quantity to its equivalent lovelace value.
Uses the token's ADA price from DexHunter and accounts for decimal
precision so that, for example, 1,000,000 units of a 6-decimal token
equals 1.0 whole token.
Args:
policy_id: Token policy ID.
asset_name_hex: Token asset name as hex (e.g. "534e454b" for SNEK).
token_quantity: Raw on-chain token quantity (base units, not decimal-adjusted).
token_decimals: Number of decimal places for the token (default 0).
Returns:
Equivalent lovelace as an integer, or None if price is unavailable.
Example:
# NIGHT token at 0.001 ADA/NIGHT, 6 decimals
# quantity = 5_000_000 (= 5.0 NIGHT), price = 0.001 ADA/token
# → 5.0 * 0.001 ADA = 0.005 ADA = 5,000 lovelace
>>> await convert_token_to_lovelace(policy_id, asset_name_hex, 5_000_000, 6)
5000
"""
if token_quantity <= 0:
return 0
# ADA is always 1:1 with itself in lovelace terms
if not policy_id and not asset_name_hex:
return token_quantity # already in lovelace
token_ada_price = await get_token_ada_price(policy_id, asset_name_hex)
if token_ada_price is None:
logger.warning(
"[cardano_price] Cannot convert token to lovelace — no price for %s%s",
policy_id[:12],
asset_name_hex[:8],
)
return None
# Adjust for decimals: base_units / 10^decimals = whole tokens
whole_tokens = token_quantity / (10 ** token_decimals)
# Whole tokens × ADA per token × lovelace per ADA
lovelace = int(whole_tokens * token_ada_price * 1_000_000)
logger.debug(
"[cardano_price] %d base units (decimals=%d) → %.6f tokens × %.8f ADA → %d lovelace",
token_quantity,
token_decimals,
whole_tokens,
token_ada_price,
lovelace,
)
return lovelace

View file

@ -0,0 +1,465 @@
"""
Cardano Payment Monitoring Scheduler
APScheduler integration that runs five recurring jobs:
- check_pending_payments every 15 seconds
- reprice_expired_payments every 60 seconds
- _check_subscription_payments every 60 seconds
- _reprice_subscription_payments every 6 hours
- _enforce_grace_period daily at 06:00 UTC
Usage in main.py lifespan:
from services.cardano_scheduler import start_cardano_scheduler, stop_cardano_scheduler
@asynccontextmanager
async def lifespan(app: FastAPI):
await start_cardano_scheduler()
yield
await stop_cardano_scheduler()
"""
import logging
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
from database import async_session_maker
from services.cardano_monitor import (
_check_address_utxos,
_evaluate_payment,
check_pending_payments,
reprice_expired_payments,
)
from services.cardano_price import convert_usd_to_lovelace, get_ada_usd_price
logger = logging.getLogger(__name__)
_scheduler: Optional[AsyncIOScheduler] = None
# =============================================================================
# Scheduled job wrappers — invoice payments
# =============================================================================
async def _job_check_pending() -> None:
"""Scheduler wrapper for check_pending_payments."""
try:
async with async_session_maker() as db:
await check_pending_payments(db)
except Exception:
logger.exception("[cardano-scheduler] check_pending_payments job failed")
async def _job_reprice_expired() -> None:
"""Scheduler wrapper for reprice_expired_payments."""
try:
async with async_session_maker() as db:
await reprice_expired_payments(db)
except Exception:
logger.exception("[cardano-scheduler] reprice_expired_payments job failed")
# =============================================================================
# Scheduled job wrappers — subscription payments
# =============================================================================
async def _job_check_subscription_payments() -> None:
"""
Poll Koios for UTXOs at awaiting_payment subscription addresses.
Reuses _check_address_utxos and _evaluate_payment from cardano_monitor.
On confirmation, advances subscription status to 'active' and updates
company.subscription_tier to match the subscription's tier.
"""
from models import Company, Subscription, SubscriptionPayment
try:
async with async_session_maker() as db:
now = datetime.now(timezone.utc)
result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status.in_(["awaiting_payment", "underpaid"]),
SubscriptionPayment.expires_at > now,
)
)
payments = result.scalars().all()
if not payments:
return
logger.debug(
"[cardano-scheduler] Checking %d subscription payment(s)", len(payments)
)
for sp in payments:
try:
utxos = await _check_address_utxos(sp.address)
# _evaluate_payment expects a CardanoPayment-like object.
# SubscriptionPayment has the same fields it needs.
new_status, raw_lovelace, total_value, received_assets, tx_hash = (
await _evaluate_payment(sp, utxos)
)
# Map monitor statuses to subscription payment statuses
status_map = {
"pending": "awaiting_payment",
"confirmed": "confirmed",
"overpaid": "overpaid",
"underpaid": "underpaid",
}
mapped_status = status_map.get(new_status, new_status)
if mapped_status == sp.status and raw_lovelace == 0:
continue
sp.received_lovelace = raw_lovelace
sp.total_value_lovelace = total_value
sp.received_assets = received_assets
if tx_hash:
sp.tx_hash = tx_hash
if mapped_status != sp.status:
old_status = sp.status
sp.status = mapped_status
if mapped_status in ("confirmed", "overpaid"):
sp.confirmed_at = now
# Advance subscription + company tier
if sp.subscription_id:
sub_result = await db.execute(
select(Subscription).where(
Subscription.id == sp.subscription_id
)
)
sub = sub_result.scalar_one_or_none()
if sub:
sub.status = "active"
sub.updated_at = now
# Apply any pending tier downgrade
if sub.pending_tier and sp.period_end and sp.period_end <= now.date():
sub.tier = sub.pending_tier
sub.pending_tier = None
sub.pending_tier_at = None
company_result = await db.execute(
select(Company).where(Company.id == sp.company_id)
)
company = company_result.scalar_one_or_none()
if company:
# Get current sub tier
sub_result2 = await db.execute(
select(Subscription).where(
Subscription.company_id == sp.company_id
)
)
sub2 = sub_result2.scalar_one_or_none()
if sub2:
company.subscription_tier = sub2.tier
company.subscription_status = "active"
logger.info(
"[cardano-scheduler] sub_payment #%d company_id=%d: %s -> %s"
" (%.6f ADA received)",
sp.id,
sp.company_id,
old_status,
mapped_status,
raw_lovelace / 1_000_000,
)
except Exception as e:
logger.exception(
"[cardano-scheduler] Error checking sub_payment #%d: %s", sp.id, e
)
await db.commit()
except Exception:
logger.exception("[cardano-scheduler] _check_subscription_payments job failed")
async def _job_reprice_subscription_payments() -> None:
"""
Reprice awaiting_payment subscription records whose 24-hour window has expired.
Fetches the current ADA price, recalculates expected_lovelace, resets expires_at
to now + 24 hours, and increments repriced_count. Gives up after 3 repricings.
"""
from models import SubscriptionPayment
try:
async with async_session_maker() as db:
now = datetime.now(timezone.utc)
result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status == "awaiting_payment",
SubscriptionPayment.expires_at <= now,
SubscriptionPayment.repriced_count < 3,
)
)
payments = result.scalars().all()
if not payments:
return
logger.info(
"[cardano-scheduler] Repricing %d subscription payment(s)", len(payments)
)
ada_price = await get_ada_usd_price()
if ada_price <= 0:
logger.warning(
"[cardano-scheduler] Cannot reprice subscriptions — ADA price unavailable"
)
return
new_expires_at = now + timedelta(hours=24)
for sp in payments:
try:
total_usd = float(sp.expected_usd or 0)
if total_usd <= 0:
sp.status = "expired"
logger.warning(
"[cardano-scheduler] sub_payment #%d has no expected_usd — expired",
sp.id,
)
continue
new_lovelace = await convert_usd_to_lovelace(total_usd)
if new_lovelace == 0:
logger.warning(
"[cardano-scheduler] sub_payment #%d: lovelace conversion returned 0, skipping",
sp.id,
)
continue
old_lovelace = sp.expected_lovelace
sp.expected_lovelace = new_lovelace
sp.ada_price_usd = Decimal(str(round(ada_price, 4)))
sp.expires_at = new_expires_at
sp.repriced_count += 1
logger.info(
"[cardano-scheduler] Repriced sub_payment #%d: %d -> %d lovelace"
" (ADA=$%.4f, reprice #%d)",
sp.id,
old_lovelace or 0,
new_lovelace,
ada_price,
sp.repriced_count,
)
except Exception as e:
logger.exception(
"[cardano-scheduler] Error repricing sub_payment #%d: %s", sp.id, e
)
# Mark max-repriced payments as expired
expired_result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status == "awaiting_payment",
SubscriptionPayment.expires_at <= now,
SubscriptionPayment.repriced_count >= 3,
)
)
for sp in expired_result.scalars().all():
sp.status = "expired"
logger.info(
"[cardano-scheduler] sub_payment #%d expired after %d repricings",
sp.id,
sp.repriced_count,
)
await db.commit()
except Exception:
logger.exception("[cardano-scheduler] _reprice_subscription_payments job failed")
async def _job_enforce_grace_period() -> None:
"""
Daily enforcement of subscription grace periods (runs at 06:00 UTC).
Rules:
- If due_date has passed and payment is not confirmed: mark subscription past_due
- If grace_deadline has passed and payment still not confirmed: suspend subscription
and update company.subscription_status accordingly
"""
from models import Company, Subscription, SubscriptionPayment
try:
async with async_session_maker() as db:
today = datetime.now(timezone.utc).date()
# Find subscriptions that are active/past_due and have an overdue payment
overdue_result = await db.execute(
select(SubscriptionPayment).where(
SubscriptionPayment.status.in_(["awaiting_payment", "underpaid", "expired"]),
SubscriptionPayment.due_date < today,
)
)
overdue_payments = overdue_result.scalars().all()
for sp in overdue_payments:
try:
sub_result = await db.execute(
select(Subscription).where(
Subscription.company_id == sp.company_id
)
)
sub = sub_result.scalar_one_or_none()
if not sub or sub.status in ("cancelled", "suspended"):
continue
company_result = await db.execute(
select(Company).where(Company.id == sp.company_id)
)
company = company_result.scalar_one_or_none()
# Grace deadline passed → suspend
if sp.grace_deadline and today > sp.grace_deadline:
if sub.status != "suspended":
sub.status = "suspended"
sub.updated_at = datetime.now(timezone.utc)
if company:
company.subscription_status = "suspended"
logger.info(
"[cardano-scheduler] company_id=%d subscription suspended"
" (grace deadline %s passed)",
sp.company_id,
sp.grace_deadline,
)
# Due date passed but within grace → mark past_due
elif sub.status == "active":
sub.status = "past_due"
sub.updated_at = datetime.now(timezone.utc)
if company:
company.subscription_status = "past_due"
logger.info(
"[cardano-scheduler] company_id=%d subscription past_due"
" (due_date %s passed)",
sp.company_id,
sp.due_date,
)
except Exception as e:
logger.exception(
"[cardano-scheduler] Error enforcing grace period for sub_payment #%d: %s",
sp.id,
e,
)
await db.commit()
except Exception:
logger.exception("[cardano-scheduler] _enforce_grace_period job failed")
# =============================================================================
# Lifecycle
# =============================================================================
async def start_cardano_scheduler() -> None:
"""
Start the Cardano payment monitoring scheduler.
Registers five jobs:
- check_pending_payments: every 15 seconds
- reprice_expired_payments: every 60 seconds
- check_subscription_payments: every 60 seconds
- reprice_subscription_payments: every 6 hours
- enforce_grace_period: daily at 06:00 UTC
Safe to call multiple times skips if already running.
"""
global _scheduler
if _scheduler and _scheduler.running:
logger.debug("[cardano-scheduler] Already running — skipping start")
return
_scheduler = AsyncIOScheduler()
_scheduler.add_job(
_job_check_pending,
trigger=IntervalTrigger(seconds=15),
id="cardano_check_pending",
name="Cardano: Check Pending Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.add_job(
_job_reprice_expired,
trigger=IntervalTrigger(seconds=60),
id="cardano_reprice_expired",
name="Cardano: Reprice Expired Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.add_job(
_job_check_subscription_payments,
trigger=IntervalTrigger(seconds=60),
id="cardano_check_sub_payments",
name="Cardano: Check Subscription Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.add_job(
_job_reprice_subscription_payments,
trigger=IntervalTrigger(hours=6),
id="cardano_reprice_sub_payments",
name="Cardano: Reprice Subscription Payments",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.add_job(
_job_enforce_grace_period,
trigger=CronTrigger(hour=6, minute=0, timezone="UTC"),
id="cardano_enforce_grace",
name="Cardano: Enforce Subscription Grace Periods",
replace_existing=True,
max_instances=1,
coalesce=True,
)
_scheduler.start()
logger.info(
"[cardano-scheduler] Started — check_pending every 15s, reprice_expired every 60s,"
" check_sub_payments every 60s, reprice_sub_payments every 6h,"
" enforce_grace_period daily at 06:00 UTC"
)
async def stop_cardano_scheduler() -> None:
"""
Gracefully stop the Cardano scheduler.
Call this in the FastAPI lifespan shutdown block.
"""
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info("[cardano-scheduler] Stopped")

69
cardano_checkout/store.py Normal file
View file

@ -0,0 +1,69 @@
"""Persistence abstraction for Invoice objects.
The SDK does not prescribe a database. Consumers implement
:class:`InvoiceStore` against whatever backend suits them SQLAlchemy
(TradeCraft pattern), SQLite (chromaticcraft pattern), Postgres raw
(ADAMaps pattern), in-memory dict (tests).
All methods are async so the same Protocol works cleanly for both
asyncpg/asyncio-sqlalchemy backends and synchronous backends wrapped
with ``asyncio.to_thread``.
"""
from __future__ import annotations
from typing import Optional, Protocol, runtime_checkable
from cardano_checkout.invoice import Invoice, InvoiceStatus
@runtime_checkable
class InvoiceStore(Protocol):
"""Persistence backend for invoices.
Consumers implement the six methods below. The SDK's monitor + scheduler
modules operate entirely through this interface, never touching a specific
ORM or driver.
"""
async def create(self, invoice: Invoice) -> None:
"""Insert a new invoice. Should raise if an invoice with the same id exists."""
...
async def get(self, invoice_id: str) -> Optional[Invoice]:
"""Fetch one invoice by id. Returns None if not found."""
...
async def list_by_status(
self, status: InvoiceStatus, limit: int = 100
) -> list[Invoice]:
"""List invoices in a given state, newest-first. Used by the monitor poll loop."""
...
async def update(self, invoice: Invoice) -> None:
"""Persist the current state of an invoice.
Implementations should compare-and-set on ``invoice.id`` if the row
doesn't exist the call should raise. Does NOT create; see :meth:`create`.
"""
...
async def next_derivation_index(self, merchant_id: str) -> int:
"""Return the next unused receive-address index for a merchant.
Should be transactionally safe against concurrent invoice creation;
consumers typically implement this via ``SELECT COALESCE(MAX(index), -1) + 1 ... FOR UPDATE``
or an atomic counter row.
"""
...
async def record_tx(
self, invoice_id: str, tx_hash: str, lovelace_delta: int
) -> None:
"""Record an observed inbound UTxO against an invoice.
Must be idempotent on (invoice_id, tx_hash) monitor loops will
re-observe the same UTxO until the invoice transitions to a terminal
state.
"""
...

View file

@ -0,0 +1,38 @@
"""Transaction construction helpers wrapping PyCardano.
v0.1.0 surface stub the chromaticcraft Phase-2 sprint will fill these in
with:
- Ogmios-backed `ChainContext` (via PyCardano's OgmiosChainContext)
- Build-transaction helpers for (a) plain ADA payment refunds, (b)
native-token mint+send, (c) reference-asset clones
- Cold-signer hand-off shape matching the ADAMaps payout pattern:
build_body_on_hot transfer via temp dir sign_offline_on_cold
return signed_witness submit_from_hot.
Exists as a named module in v0.1 so consumers can import the stable path
without having to update imports later.
"""
from __future__ import annotations
def _v0_2_sentinel(_name: str) -> None:
raise NotImplementedError(
f"txbuild.{_name} ships in cardano-checkout v0.2 alongside the "
"Ogmios chain-context wiring and cold-signer hand-off."
)
def build_mint_tx(*args, **kwargs): # noqa: D401, ANN002, ANN003
"""Build an unsigned mint transaction. v0.2."""
_v0_2_sentinel("build_mint_tx")
def build_payment_tx(*args, **kwargs): # noqa: D401, ANN002, ANN003
"""Build an unsigned payment transaction (e.g. refund path). v0.2."""
_v0_2_sentinel("build_payment_tx")
def submit_signed_tx(*args, **kwargs): # noqa: D401, ANN002, ANN003
"""Submit a signed tx to Ogmios. v0.2."""
_v0_2_sentinel("submit_signed_tx")

47
pyproject.toml Normal file
View file

@ -0,0 +1,47 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "cardano-checkout"
version = "0.1.0.dev0"
description = "Merchant-side Cardano payments SDK + NFT cert-of-authenticity minting (zero-custody)"
readme = "README.md"
requires-python = ">=3.10"
license = {text = "Apache-2.0"}
authors = [
{name = "Sulkta Coop"},
]
keywords = ["cardano", "payments", "nft", "checkout", "blockchain", "ada", "pycardano"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Office/Business :: Financial",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"pycardano>=0.11.0",
"httpx>=0.27",
"apscheduler>=3.10",
]
[project.optional-dependencies]
sqlalchemy = ["sqlalchemy>=2.0"]
test = ["pytest>=7", "pytest-asyncio>=0.23"]
dev = ["pytest>=7", "pytest-asyncio>=0.23", "ruff", "mypy"]
[project.urls]
Repository = "http://192.168.0.5:3001/Sulkta-Coop/cardano-checkout-py"
[tool.setuptools.packages.find]
include = ["cardano_checkout*"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

65
tests/test_addresses.py Normal file
View file

@ -0,0 +1,65 @@
"""Deterministic address-derivation smoke test.
Uses a known test-vector xpub (the one shipped in the pycardano docs)
to assert the derived addresses are stable and reproducible across SDK
versions. If this test ever changes output, we have a backwards-compat
problem that would break every merchant's receive-address history.
"""
from __future__ import annotations
import pytest
from cardano_checkout import addresses
# Public test vector — a CIP-1852 account extended public key.
# 64 bytes = 32 bytes Ed25519 pubkey || 32 bytes chain code, hex encoded.
# This particular key is drawn from pycardano's own test suite fixtures.
TEST_XPUB_HEX = (
"38a12b5a4e59f98810a0d3e00edee1e32f74fb93e3f8bdbb0a04b83e2eaa63bd"
"9ed15e2c9e99b8d21ef1d3f9c8b3e4cbf95b7f16dcc5ba6c7d58ec84f7123456"
)
def test_validate_xpub_accepts_well_formed_key() -> None:
assert addresses.validate_xpub(TEST_XPUB_HEX) is True
def test_validate_xpub_rejects_empty_and_junk() -> None:
assert addresses.validate_xpub("") is False
assert addresses.validate_xpub("notreallyhex!!") is False
assert addresses.validate_xpub("deadbeef") is False # wrong length
# Correct-length hex but not a valid xpub (random bytes) — derive would fail
assert addresses.validate_xpub("aa" * 64) is False
def test_derive_address_is_deterministic() -> None:
a0 = addresses.derive_address(TEST_XPUB_HEX, index=0, network="mainnet")
a0_again = addresses.derive_address(TEST_XPUB_HEX, index=0, network="mainnet")
assert a0 == a0_again
assert a0.startswith("addr1")
def test_derive_address_distinct_per_index() -> None:
a0 = addresses.derive_address(TEST_XPUB_HEX, index=0, network="mainnet")
a1 = addresses.derive_address(TEST_XPUB_HEX, index=1, network="mainnet")
a42 = addresses.derive_address(TEST_XPUB_HEX, index=42, network="mainnet")
assert a0 != a1 != a42
def test_derive_address_network_switch_changes_prefix() -> None:
mainnet = addresses.derive_address(TEST_XPUB_HEX, index=0, network="mainnet")
testnet = addresses.derive_address(TEST_XPUB_HEX, index=0, network="testnet")
assert mainnet.startswith("addr1")
assert testnet.startswith("addr_test1")
def test_derive_address_rejects_negative_index() -> None:
with pytest.raises(ValueError, match="non-negative"):
addresses.derive_address(TEST_XPUB_HEX, index=-1)
def test_derive_address_rejects_bad_network() -> None:
with pytest.raises(ValueError, match="Invalid network"):
addresses.derive_address(TEST_XPUB_HEX, index=0, network="preprod")

View file

@ -0,0 +1,56 @@
"""CIP-25 v2 metadata envelope construction — pure unit tests, no network."""
from __future__ import annotations
from cardano_checkout.mint import build_cip25_metadata
def test_basic_envelope_shape() -> None:
md = build_cip25_metadata(
policy_id="abc123",
asset_name="ChromaticCraft-Order-0001",
name="Chromatic Craft — Custom Order #0001",
image_cid="bafybeibgen",
description="Hand-stitched moth pendant",
properties={"order_id": "0001", "edition": "1 of 1"},
)
assert md["721"]["version"] == "2.0"
assert "abc123" in md["721"]
nft = md["721"]["abc123"]["ChromaticCraft-Order-0001"]
assert nft["name"] == "Chromatic Craft — Custom Order #0001"
assert nft["image"] == "ipfs://bafybeibgen"
assert nft["mediaType"] == "image/jpeg"
assert nft["description"] == "Hand-stitched moth pendant"
assert nft["order_id"] == "0001"
assert nft["edition"] == "1 of 1"
def test_description_under_64_chars_stays_a_string() -> None:
md = build_cip25_metadata(
policy_id="abc", asset_name="x", name="n",
image_cid="c", description="short",
)
assert md["721"]["abc"]["x"]["description"] == "short"
def test_description_over_64_chars_chunks_to_list() -> None:
long = "x" * 150
md = build_cip25_metadata(
policy_id="abc", asset_name="x", name="n",
image_cid="c", description=long,
)
desc = md["721"]["abc"]["x"]["description"]
assert isinstance(desc, list)
assert all(len(chunk) <= 64 for chunk in desc)
assert "".join(desc) == long
def test_image_uri_has_ipfs_prefix() -> None:
md = build_cip25_metadata(
policy_id="abc", asset_name="x", name="n",
image_cid="bafybeitestcid",
)
assert md["721"]["abc"]["x"]["image"].startswith("ipfs://")
assert "bafybeitestcid" in md["721"]["abc"]["x"]["image"]

49
tests/test_invoice.py Normal file
View file

@ -0,0 +1,49 @@
"""Invoice dataclass + state machine tests."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from cardano_checkout.invoice import Invoice, InvoiceStatus
def _make() -> Invoice:
return Invoice(
id="inv_001",
merchant_id="chromaticcraft",
derivation_index=0,
receive_address="addr1...",
expected_lovelace=5_000_000, # 5 ADA
usd_amount=2.50,
)
def test_defaults_are_pending_and_non_terminal() -> None:
inv = _make()
assert inv.status == InvoiceStatus.PENDING
assert inv.is_terminal is False
assert inv.ada_amount == 5.0
def test_terminal_states() -> None:
for s in (InvoiceStatus.CONFIRMED, InvoiceStatus.EXPIRED, InvoiceStatus.CANCELLED):
inv = _make()
inv.status = s
assert inv.is_terminal is True
def test_is_expired_honors_expires_at() -> None:
past = datetime.now(timezone.utc) - timedelta(minutes=5)
future = datetime.now(timezone.utc) + timedelta(minutes=5)
inv = _make()
inv.expires_at = past
assert inv.is_expired() is True
inv.expires_at = future
assert inv.is_expired() is False
# Confirmed invoices are never "expired" regardless of timestamp
inv.expires_at = past
inv.status = InvoiceStatus.CONFIRMED
assert inv.is_expired() is False