diff --git a/tests/test_mint_metadata.py b/tests/test_mint_metadata.py new file mode 100644 index 0000000..ec4fb77 --- /dev/null +++ b/tests/test_mint_metadata.py @@ -0,0 +1,308 @@ +"""CIP-25 v2 envelope round-trips + unsigned-mint shape tests. + +Complements ``test_cip25_metadata.py`` (the pure builder unit tests) by +checking: + +- Round-tripping the envelope through pycardano's Metadata/AuxiliaryData + produces a well-formed CBOR blob (the wallet-visible thing). +- :func:`mint_nft_cert` returns a correctly-shaped :class:`UnsignedMint` + without hitting a live chain — we stub the ChainContext. + +The chain-context stub mirrors just enough of pycardano's interface for +``TransactionBuilder.build`` to succeed. No live Ogmios calls. +""" + +from __future__ import annotations + +import pytest + +from cardano_checkout import UnsignedMint, build_cip25_metadata, mint_nft_cert + + +# --------------------------------------------------------------------------- +# Fixtures — a deterministic test policy + a stub ChainContext +# --------------------------------------------------------------------------- + + +def _test_policy(): + """Return a fresh 2-of-2 NativeScript all-of policy + its hash + CBOR. + + Uses fixed verification-key hashes (32-char hex, 28 bytes as + required by Cardano VKH). No cryptographic significance — just + deterministic filler for tests. + """ + from pycardano import ( + NativeScript, + ScriptAll, + ScriptPubkey, + VerificationKeyHash, + ) + + vkh_cobb = VerificationKeyHash.from_primitive(bytes.fromhex("11" * 28)) + vkh_kayos = VerificationKeyHash.from_primitive(bytes.fromhex("22" * 28)) + script: NativeScript = ScriptAll( + [ScriptPubkey(vkh_cobb), ScriptPubkey(vkh_kayos)] + ) + return script, [vkh_cobb.payload.hex(), vkh_kayos.payload.hex()] + + +def _stub_context(): + """Return a minimal ChainContext stub with just enough surface for builder.build().""" + from pycardano import ( + Address, + AssetName, + MultiAsset, + ProtocolParameters, + TransactionId, + TransactionInput, + TransactionOutput, + UTxO, + Value, + ) + + class StubContext: + network = None + + @property + def last_block_slot(self) -> int: + return 100_000_000 + + @property + def protocol_param(self) -> ProtocolParameters: + # Mainnet values as of 2025-ish — just enough to get fee math through. + return ProtocolParameters( + min_fee_constant=155_381, + min_fee_coefficient=44, + max_block_size=90_112, + max_tx_size=16_384, + max_block_header_size=1_100, + key_deposit=2_000_000, + pool_deposit=500_000_000, + pool_influence=0.3, + monetary_expansion=0.003, + treasury_expansion=0.2, + decentralization_param=0, + extra_entropy="", + protocol_major_version=9, + protocol_minor_version=0, + min_utxo=1_000_000, + min_pool_cost=340_000_000, + price_mem=0.0577, + price_step=0.0000721, + max_tx_ex_mem=14_000_000, + max_tx_ex_steps=10_000_000_000, + max_block_ex_mem=62_000_000, + max_block_ex_steps=20_000_000_000, + max_val_size=5_000, + collateral_percent=150, + max_collateral_inputs=3, + coins_per_utxo_byte=4_310, + coins_per_utxo_word=34_482, + cost_models={}, # Plutus cost models — not used by native-script mints. + ) + + @property + def genesis_param(self): + # Minimal stand-in — pycardano's builder uses this for slot math. + from pycardano import GenesisParameters + + return GenesisParameters( + active_slots_coefficient=0.05, + update_quorum=5, + max_lovelace_supply=45_000_000_000_000_000, + network_magic=764_824_073, + epoch_length=432_000, + system_start=1_506_203_091, + slots_per_kes_period=129_600, + slot_length=1, + max_kes_evolutions=62, + security_param=2_160, + ) + + @property + def era(self): + from pycardano import Era + return Era.CONWAY + + def utxos(self, address): + # Provide one fat UTxO so the builder has inputs to draw fees + min-ADA from. + addr = ( + address + if isinstance(address, Address) + else Address.from_primitive(address) + ) + tx_in = TransactionInput( + transaction_id=TransactionId.from_primitive(bytes.fromhex("cc" * 32)), + index=0, + ) + # 1000 ADA, no native assets — plenty for fees + NFT min-UTxO. + output = TransactionOutput(addr, Value(1_000_000_000)) + return [UTxO(tx_in, output)] + + def submit_tx(self, tx): # pragma: no cover — not invoked in these tests + pass + + return StubContext() + + +# --------------------------------------------------------------------------- +# Test vectors for the envelope +# --------------------------------------------------------------------------- + + +def test_envelope_from_chromaticcraft_order_vector() -> None: + """A realistic chromaticcraft cert — image CID + studio properties.""" + md = build_cip25_metadata( + policy_id="4a3b2c1d0e9f8a7b6c5d4e3f2a1b0c9d8e7f6a5b4c3d2e1f0a9b8c7d", + asset_name="ChromaticCraftCert0042", + name="Chromatic Craft Cert #0042", + image_cid="bafybeihkop... real cid would be 59 base32 chars", + description=( + "Certificate of authenticity for hand-stitched custom moth pendant " + "ordered by Abby, completed 2026-04-17 by Cobb." + ), + media_type="image/png", + properties={ + "studio": "chromaticcraft", + "artisan": "Cobb", + "order_id": "CC-2026-0042", + "edition": "1 of 1", + "material": "sterling silver + polymer clay", + }, + ) + + label = md["721"] + nft = label[ + "4a3b2c1d0e9f8a7b6c5d4e3f2a1b0c9d8e7f6a5b4c3d2e1f0a9b8c7d" + ]["ChromaticCraftCert0042"] + + assert label["version"] == "2.0" + assert nft["name"] == "Chromatic Craft Cert #0042" + assert nft["mediaType"] == "image/png" + assert nft["studio"] == "chromaticcraft" + assert nft["artisan"] == "Cobb" + assert nft["edition"] == "1 of 1" + + +def test_envelope_roundtrips_through_pycardano_metadata() -> None: + """CBOR-encode and decode — the wallet-visible path.""" + from pycardano import AuxiliaryData, Metadata + + raw = build_cip25_metadata( + policy_id="ab" * 28, + asset_name="TestNFT", + name="Test NFT", + image_cid="bafybeitestcidforround-tripping", + description="round trip", + ) + # pycardano's Metadata expects int keys at the top level. + inner = {int(k): v for k, v in raw.items()} + md = Metadata(inner) + aux = AuxiliaryData(md) + + cbor_hex = aux.to_cbor_hex() + # Must be valid hex and non-trivially sized. + assert len(cbor_hex) > 40 + assert all(c in "0123456789abcdef" for c in cbor_hex) + + # Decoding back must yield the same metadata. + decoded = AuxiliaryData.from_cbor(bytes.fromhex(cbor_hex)) + decoded_md = decoded.data if hasattr(decoded, "data") else decoded # pycardano compat + assert decoded_md is not None + + +def test_envelope_version_key_always_present() -> None: + md = build_cip25_metadata( + policy_id="a" * 56, asset_name="x", name="n", image_cid="c" + ) + assert md["721"]["version"] == "2.0" + + +# --------------------------------------------------------------------------- +# mint_nft_cert — full tx body construction against a stubbed context +# --------------------------------------------------------------------------- + + +async def test_mint_nft_cert_returns_unsigned_bundle() -> None: + from pycardano import Address, Network, PaymentKeyPair, StakeKeyPair + + script, signer_hashes = _test_policy() + policy_cbor_hex = script.to_cbor_hex() + # policy_id is blake2b-224 of the script CBOR — use pycardano to compute. + policy_id = script.hash().payload.hex() + + # Build two throwaway addresses in testnet namespace. + pay_key = PaymentKeyPair.generate() + stk_key = StakeKeyPair.generate() + funding_addr = str( + Address( + payment_part=pay_key.verification_key.hash(), + staking_part=stk_key.verification_key.hash(), + network=Network.TESTNET, + ) + ) + recipient_pay = PaymentKeyPair.generate() + recipient_stk = StakeKeyPair.generate() + recipient_addr = str( + Address( + payment_part=recipient_pay.verification_key.hash(), + staking_part=recipient_stk.verification_key.hash(), + network=Network.TESTNET, + ) + ) + + from cardano_checkout import MintPolicy + + policy = MintPolicy( + policy_id=policy_id, + script_cbor_hex=policy_cbor_hex, + required_signer_hashes=signer_hashes, + ) + + metadata = build_cip25_metadata( + policy_id=policy_id, + asset_name="TestCert01", + name="Test Cert 01", + image_cid="bafybeitest", + ) + + result = await mint_nft_cert( + policy=policy, + asset_name="TestCert01", + metadata=metadata, + recipient_address=recipient_addr, + funding_address=funding_addr, + context=_stub_context(), + network="testnet", + ) + + assert isinstance(result, UnsignedMint) + assert len(result.tx_id) == 64 # hex-encoded 32-byte blake2b hash + assert result.tx_body_cbor_hex + assert all(c in "0123456789abcdef" for c in result.tx_body_cbor_hex) + assert result.auxiliary_data_cbor_hex + assert result.native_script_cbor_hex == policy_cbor_hex + assert result.required_signer_hashes == signer_hashes + assert policy_id in result.summary + assert recipient_addr in result.summary + + +async def test_mint_nft_cert_rejects_oversize_asset_name() -> None: + from cardano_checkout import MintPolicy + + policy = MintPolicy( + policy_id="aa" * 28, + script_cbor_hex="82008200581c" + "11" * 28, # doesn't matter — builder never reached + required_signer_hashes=[], + ) + + with pytest.raises(ValueError, match="asset_name"): + await mint_nft_cert( + policy=policy, + asset_name="X" * 33, # 33 > 32 byte limit + metadata={"721": {}}, + recipient_address="addr_test1...", + funding_address="addr_test1...", + context=_stub_context(), + network="testnet", + ) diff --git a/tests/test_monitor_with_inmemory_store.py b/tests/test_monitor_with_inmemory_store.py new file mode 100644 index 0000000..0c1507c --- /dev/null +++ b/tests/test_monitor_with_inmemory_store.py @@ -0,0 +1,247 @@ +"""Monitor integration tests against InMemoryStore. + +Stubs both Koios (via monkeypatch on ``check_address_utxos``) and the +ADA/USD oracle so the tests are deterministic and offline. Exercises +the main state transitions the scheduler cares about: + +- PENDING → CONFIRMED when UTxOs satisfy the 98% threshold +- PENDING → OVERPAID when UTxOs exceed the 102% threshold +- PENDING → UNDERPAID when UTxOs are nonzero but below threshold +- PENDING (kept) when no UTxOs yet +- Reprice of an expired invoice → new expected_lovelace + new expires_at +- Reprice hitting max_repricings → EXPIRED +""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest + +from cardano_checkout import InMemoryStore, Invoice, InvoiceStatus +from cardano_checkout import monitor + + +def _make( + id_: str = "inv", + expected_lovelace: int = 5_000_000, + expires_in_minutes: int = 15, + status: InvoiceStatus = InvoiceStatus.PENDING, + repriced_count: int = 0, +) -> Invoice: + now = datetime.now(timezone.utc) + return Invoice( + id=id_, + merchant_id="chromaticcraft", + derivation_index=0, + receive_address="addr1testreceive", + expected_lovelace=expected_lovelace, + usd_amount=2.50, + status=status, + expires_at=now + timedelta(minutes=expires_in_minutes), + metadata={"repriced_count": repriced_count} if repriced_count else {}, + ) + + +def _utxo(lovelace: int, tx_hash: str = "aa" * 32) -> dict: + return { + "tx_hash": tx_hash, + "tx_index": 0, + "value": str(lovelace), + "asset_list": [], + } + + +@pytest.fixture(autouse=True) +def _patch_koios_and_oracle(monkeypatch): + """Default: no UTxOs, oracle returns $0.45/ADA. Individual tests override.""" + + async def fake_utxos(address, koios_url=None, timeout=None): + return [] + + async def fake_price(): + return 0.45 + + async def fake_convert(usd): + if usd <= 0: + return 0 + return int((usd / 0.45) * 1_000_000) + + monkeypatch.setattr(monitor, "check_address_utxos", fake_utxos) + monkeypatch.setattr(monitor, "get_ada_usd_price", fake_price) + monkeypatch.setattr(monitor, "convert_usd_to_lovelace", fake_convert) + + +# --------------------------------------------------------------------------- +# check_pending_invoices +# --------------------------------------------------------------------------- + + +async def test_no_utxos_leaves_invoice_pending() -> None: + store = InMemoryStore() + await store.create(_make()) + + updated = await monitor.check_pending_invoices(store) + assert updated == 0 + + inv = await store.get("inv") + assert inv is not None + assert inv.status == InvoiceStatus.PENDING + assert inv.received_lovelace == 0 + + +async def test_confirm_within_tolerance(monkeypatch) -> None: + store = InMemoryStore() + await store.create(_make(expected_lovelace=5_000_000)) + + async def fake_utxos(address, koios_url=None, timeout=None): + # 4.9 ADA — right at the 98% confirm threshold. + return [_utxo(4_900_000)] + + monkeypatch.setattr(monitor, "check_address_utxos", fake_utxos) + + updated = await monitor.check_pending_invoices(store) + assert updated == 1 + + inv = await store.get("inv") + assert inv is not None + assert inv.status == InvoiceStatus.CONFIRMED + assert inv.received_lovelace == 4_900_000 + assert inv.confirmed_at is not None + assert inv.tx_hashes == ["aa" * 32] + + +async def test_overpaid_flag_when_above_threshold(monkeypatch) -> None: + store = InMemoryStore() + await store.create(_make(expected_lovelace=5_000_000)) + + async def fake_utxos(address, koios_url=None, timeout=None): + return [_utxo(5_200_000)] + + monkeypatch.setattr(monitor, "check_address_utxos", fake_utxos) + + await monitor.check_pending_invoices(store) + inv = await store.get("inv") + assert inv is not None + assert inv.status == InvoiceStatus.OVERPAID + assert inv.confirmed_at is not None + + +async def test_underpaid_when_below_tolerance(monkeypatch) -> None: + store = InMemoryStore() + await store.create(_make(expected_lovelace=5_000_000)) + + async def fake_utxos(address, koios_url=None, timeout=None): + return [_utxo(3_000_000)] + + monkeypatch.setattr(monitor, "check_address_utxos", fake_utxos) + + await monitor.check_pending_invoices(store) + inv = await store.get("inv") + assert inv is not None + assert inv.status == InvoiceStatus.UNDERPAID + assert inv.received_lovelace == 3_000_000 + assert inv.confirmed_at is None + + +async def test_record_tx_called_for_observed_hashes(monkeypatch) -> None: + store = InMemoryStore() + await store.create(_make(expected_lovelace=5_000_000)) + + async def fake_utxos(address, koios_url=None, timeout=None): + return [_utxo(4_900_000, tx_hash="feed" + "00" * 30)] + + monkeypatch.setattr(monitor, "check_address_utxos", fake_utxos) + + await monitor.check_pending_invoices(store) + + records = store._tx_records() + assert any(k[0] == "inv" and k[1] == "feed" + "00" * 30 for k in records) + + +async def test_already_expired_invoices_are_skipped(monkeypatch) -> None: + """Invoices past their expiry are left for reprice_expired_invoices to handle.""" + store = InMemoryStore() + expired = _make(expected_lovelace=5_000_000) + expired.expires_at = datetime.now(timezone.utc) - timedelta(minutes=1) + await store.create(expired) + + calls = [] + + async def fake_utxos(address, koios_url=None, timeout=None): + calls.append(address) + return [_utxo(5_000_000)] + + monkeypatch.setattr(monitor, "check_address_utxos", fake_utxos) + + await monitor.check_pending_invoices(store) + # check_pending_invoices must not have polled Koios for an already-expired invoice. + assert calls == [] + + +# --------------------------------------------------------------------------- +# reprice_expired_invoices +# --------------------------------------------------------------------------- + + +async def test_reprice_updates_expected_lovelace_and_extends_expiry() -> None: + store = InMemoryStore() + inv = _make(expected_lovelace=5_000_000) + inv.expires_at = datetime.now(timezone.utc) - timedelta(minutes=1) + await store.create(inv) + + updated = await monitor.reprice_expired_invoices( + store, window_minutes=15, max_repricings=3 + ) + assert updated == 1 + + fetched = await store.get("inv") + assert fetched is not None + # USD $2.50 @ $0.45/ADA = 5.555 ADA = 5555555 lovelace. + assert fetched.expected_lovelace == 5_555_555 + assert fetched.status == InvoiceStatus.PENDING + assert fetched.expires_at is not None + assert fetched.expires_at > datetime.now(timezone.utc) + assert fetched.metadata["repriced_count"] == 1 + + +async def test_reprice_gives_up_after_max_repricings() -> None: + store = InMemoryStore() + inv = _make(expected_lovelace=5_000_000, repriced_count=3) + inv.expires_at = datetime.now(timezone.utc) - timedelta(minutes=1) + await store.create(inv) + + await monitor.reprice_expired_invoices( + store, window_minutes=15, max_repricings=3 + ) + + fetched = await store.get("inv") + assert fetched is not None + assert fetched.status == InvoiceStatus.EXPIRED + + +async def test_reprice_noop_when_nothing_expired() -> None: + store = InMemoryStore() + await store.create(_make(expired_in_minutes=15) if False else _make()) + + updated = await monitor.reprice_expired_invoices(store) + assert updated == 0 + + +async def test_reprice_skips_when_oracle_unavailable(monkeypatch) -> None: + store = InMemoryStore() + inv = _make(expected_lovelace=5_000_000) + inv.expires_at = datetime.now(timezone.utc) - timedelta(minutes=1) + await store.create(inv) + + async def zero_price(): + return 0.0 + + monkeypatch.setattr(monitor, "get_ada_usd_price", zero_price) + + updated = await monitor.reprice_expired_invoices(store) + assert updated == 0 + + fetched = await store.get("inv") + assert fetched is not None + assert fetched.status == InvoiceStatus.PENDING # not flipped to expired diff --git a/tests/test_store_protocol.py b/tests/test_store_protocol.py new file mode 100644 index 0000000..8a94bdd --- /dev/null +++ b/tests/test_store_protocol.py @@ -0,0 +1,180 @@ +"""InvoiceStore protocol conformance + InMemoryStore round-trips. + +These tests exist to catch regressions in the reference implementation +and to document the exact semantics the monitor + scheduler rely on. +Any backend consumers implement should pass the same suite (we don't +parameterize yet — that lands when we ship the SQLAlchemy adapter). +""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest + +from cardano_checkout import InMemoryStore, Invoice, InvoiceStatus, InvoiceStore + + +def _make_invoice( + id_: str = "inv_001", + merchant: str = "chromaticcraft", + index: int = 0, + status: InvoiceStatus = InvoiceStatus.PENDING, +) -> Invoice: + return Invoice( + id=id_, + merchant_id=merchant, + derivation_index=index, + receive_address=f"addr1...{index}", + expected_lovelace=5_000_000, + usd_amount=2.50, + status=status, + expires_at=datetime.now(timezone.utc) + timedelta(minutes=15), + ) + + +def test_inmemory_store_satisfies_protocol() -> None: + """Runtime isinstance check against the Protocol — catches API drift.""" + assert isinstance(InMemoryStore(), InvoiceStore) + + +async def test_create_then_get_round_trips() -> None: + store = InMemoryStore() + inv = _make_invoice() + + await store.create(inv) + fetched = await store.get(inv.id) + + assert fetched is not None + assert fetched.id == inv.id + assert fetched.merchant_id == inv.merchant_id + assert fetched.expected_lovelace == 5_000_000 + assert fetched.status == InvoiceStatus.PENDING + + +async def test_create_rejects_duplicate_id() -> None: + store = InMemoryStore() + inv = _make_invoice() + await store.create(inv) + + with pytest.raises(ValueError, match="already exists"): + await store.create(_make_invoice()) + + +async def test_get_missing_returns_none() -> None: + store = InMemoryStore() + assert await store.get("does-not-exist") is None + + +async def test_update_persists_state_changes() -> None: + store = InMemoryStore() + inv = _make_invoice() + await store.create(inv) + + inv.status = InvoiceStatus.CONFIRMED + inv.received_lovelace = 5_100_000 + inv.confirmed_at = datetime.now(timezone.utc) + await store.update(inv) + + fetched = await store.get(inv.id) + assert fetched is not None + assert fetched.status == InvoiceStatus.CONFIRMED + assert fetched.received_lovelace == 5_100_000 + assert fetched.confirmed_at is not None + + +async def test_update_on_missing_raises() -> None: + store = InMemoryStore() + with pytest.raises(KeyError): + await store.update(_make_invoice(id_="never-created")) + + +async def test_get_is_defensive_copy() -> None: + """Mutating a fetched invoice must not change stored state.""" + store = InMemoryStore() + inv = _make_invoice() + await store.create(inv) + + fetched = await store.get(inv.id) + assert fetched is not None + fetched.status = InvoiceStatus.CANCELLED + fetched.metadata["tampered"] = True + + # Re-fetch — store should still have the original. + refetched = await store.get(inv.id) + assert refetched is not None + assert refetched.status == InvoiceStatus.PENDING + assert "tampered" not in refetched.metadata + + +async def test_list_by_status_returns_only_matching() -> None: + store = InMemoryStore() + await store.create(_make_invoice(id_="p1", index=0)) + await store.create(_make_invoice(id_="p2", index=1)) + await store.create( + _make_invoice(id_="c1", index=2, status=InvoiceStatus.CONFIRMED) + ) + + pending = await store.list_by_status(InvoiceStatus.PENDING) + confirmed = await store.list_by_status(InvoiceStatus.CONFIRMED) + expired = await store.list_by_status(InvoiceStatus.EXPIRED) + + assert {inv.id for inv in pending} == {"p1", "p2"} + assert {inv.id for inv in confirmed} == {"c1"} + assert expired == [] + + +async def test_list_by_status_honours_limit() -> None: + store = InMemoryStore() + for i in range(5): + await store.create(_make_invoice(id_=f"p{i}", index=i)) + + results = await store.list_by_status(InvoiceStatus.PENDING, limit=3) + assert len(results) == 3 + + +async def test_next_derivation_index_is_monotonic_per_merchant() -> None: + store = InMemoryStore() + m1 = "chromaticcraft" + m2 = "tradecraft" + + assert await store.next_derivation_index(m1) == 0 + assert await store.next_derivation_index(m1) == 1 + assert await store.next_derivation_index(m2) == 0 # independent per merchant + assert await store.next_derivation_index(m1) == 2 + + +async def test_create_bumps_index_cursor_if_higher() -> None: + """If a caller creates an invoice at index N, the next derivation MUST skip past it.""" + store = InMemoryStore() + await store.create(_make_invoice(id_="manual", index=7)) + + nxt = await store.next_derivation_index("chromaticcraft") + assert nxt == 8 + + +async def test_record_tx_is_idempotent() -> None: + store = InMemoryStore() + inv = _make_invoice() + await store.create(inv) + + await store.record_tx(inv.id, "deadbeef", 5_000_000) + await store.record_tx(inv.id, "deadbeef", 5_000_000) + + fetched = await store.get(inv.id) + assert fetched is not None + # tx_hashes list should contain the hash once, not twice. + assert fetched.tx_hashes.count("deadbeef") == 1 + + +async def test_record_tx_appends_multiple_distinct_hashes() -> None: + store = InMemoryStore() + inv = _make_invoice() + await store.create(inv) + + await store.record_tx(inv.id, "aaaa", 1_000_000) + await store.record_tx(inv.id, "bbbb", 2_000_000) + + fetched = await store.get(inv.id) + assert fetched is not None + assert set(fetched.tx_hashes) == {"aaaa", "bbbb"}