cardano-api/main.py
Cobb Hayes aa8879bc69 Public-flip audit: drop audit-ticket prefixes + topology refs + AI scaffolding
cardano-api: strip 'Fix #N:' audit-ticket prefixes from inline comments (was
50+ in main.py), drop hardening-pass changelog blocks from module docstring,
rewrite README to drop deploy paths + marketing sections, keep tier/auth/TTL
+ policy IDs.

cardano-checkout-py: drop TradeCraft lineage refs, swap chromaticcraft/tradecraft
test fixtures for acme/globex, repository URL → git.sulkta.com.
2026-05-27 11:15:02 -07:00

1996 lines
68 KiB
Python

"""
cardano-api — REST API over cardano-db-sync + cardano-node.
Reads hit the db-sync Postgres directly. UTxO queries, protocol params, and
tx submit shell out to cardano-cli against a local node socket. Auth is
TRP-token-gated via CIP-8 wallet signatures; keys are stored as sha256(key).
Tiers:
anonymous 0 TRP 20 req/min db-sync only, no node access
standard >=50 TRP 100 req/min + node reads (utxos, protocol-params)
elevated >=500 TRP 1000 req/min + POST /v1/tx/submit
master env key unlimited everything
Node endpoints return 403 for insufficient tier.
Known policy IDs:
TRP 9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05
MAP 24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c
"""
import os
import re
import json
import hashlib
import secrets
import time
import logging
import asyncio
import subprocess
import tempfile
import base64
from datetime import datetime, timezone, timedelta
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException, Query, Header, Depends, BackgroundTasks
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from pydantic import BaseModel, Field
import asyncpg
from asyncpg.exceptions import UndefinedTableError, PostgresError
import redis.asyncio as redis
import cbor2
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Environment config
DB_HOST = os.getenv("DB_HOST", "postgres-dbsync")
DB_PORT = int(os.getenv("DB_PORT", "5432"))
DB_NAME = os.getenv("DB_NAME", "cexplorer")
DB_USER = os.getenv("DB_USER", "dbsync")
DB_PASS = os.getenv("DB_PASS", "")
REDIS_HOST = os.getenv("REDIS_HOST", "redis-api")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
API_MASTER_KEY = os.getenv("API_MASTER_KEY", "")
CARDANO_NODE_SOCKET_PATH = os.getenv("CARDANO_NODE_SOCKET_PATH", "/node-ipc/node.socket")
CARDANO_NETWORK = os.getenv("CARDANO_NETWORK", "mainnet")
# Known policy IDs
TRP_POLICY_ID = "9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05"
MAP_POLICY_ID = "24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c"
# TRP Tier thresholds
TRP_TIERS = {
"elevated": 500,
"standard": 50,
"anonymous": 0
}
# Rate limits (requests per minute)
RATE_LIMITS = {
"anonymous": 20,
"standard": 100,
"elevated": 1000,
"master": float("inf")
}
# TX Submit rate limits (per minute)
TX_SUBMIT_LIMITS = {
"anonymous": 0, # blocked
"standard": 2,
"elevated": 10,
"master": float("inf")
}
# Cache TTLs (seconds)
CACHE_TTLS = {
"balance": 60,
"tokens": 60,
"transactions": 30,
"block_latest": 10,
"tx_details": 300,
"asset_info": 120,
"pool_info": 120,
"sync_status": 5,
"protocol_params": 300, # 5 min cache for protocol params
"utxos": 10 # Short cache for UTxOs
}
# TRP-gated key expiry (48 hours)
TRP_KEY_EXPIRY_HOURS = 48
# Proxies whose X-Forwarded-For we trust. Loopback + docker default bridges
# cover the standard compose deploy; override for any other front-end.
TRUSTED_PROXIES = {"127.0.0.1", "::1", "172.22.0.1", "172.17.0.1"}
# Input validation regexes — gate all path params before any DB query.
ADDR_RE = re.compile(r'^addr1[a-z0-9]{50,120}$')
ADDR_TEST_RE = re.compile(r'^addr_test1[a-z0-9]{50,120}$')
HEX64_RE = re.compile(r'^[a-fA-F0-9]{64}$')
POLICY_RE = re.compile(r'^[a-fA-F0-9]{56}$')
# Global connections
db_pool: Optional[asyncpg.Pool] = None
redis_client: Optional[redis.Redis] = None
protocol_params_cache: dict = {"data": None, "expires": 0}
# Async lock for protocol params cache to prevent stampede.
_params_lock = asyncio.Lock()
# ============ Input Validation ============
def validate_address(address: str) -> bool:
"""Validate Cardano address format."""
return bool(ADDR_RE.match(address) or ADDR_TEST_RE.match(address))
def validate_tx_hash(tx_hash: str) -> bool:
"""Validate transaction hash format (64 hex chars)."""
clean = tx_hash.lower().replace("0x", "")
return bool(HEX64_RE.match(clean))
def validate_policy_id(policy_id: str) -> bool:
"""Validate policy ID format (56 hex chars)."""
clean = policy_id.lower().replace("0x", "")
return bool(POLICY_RE.match(clean))
# ============ Helper Functions ============
def hash_api_key(key: str) -> str:
"""Hash an API key for storage/lookup. Raw keys are never persisted."""
return hashlib.sha256(key.encode()).hexdigest()
def run_cardano_cli(args: list[str], timeout: int = 30) -> tuple[bool, str, str]:
"""Run cardano-cli command and return (success, stdout, stderr)."""
cmd = ["cardano-cli"] + args
env = os.environ.copy()
env["CARDANO_NODE_SOCKET_PATH"] = CARDANO_NODE_SOCKET_PATH
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
env=env
)
return result.returncode == 0, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return False, "", "Command timed out"
except FileNotFoundError:
return False, "", "cardano-cli not found"
except Exception as e:
logger.error(f"cardano-cli error: {e}")
return False, "", "Node command failed"
def decode_hex_or_base64(data: str) -> bytes:
"""Decode hex or base64 encoded data."""
# Try hex first
try:
if data.startswith("0x"):
return bytes.fromhex(data[2:])
return bytes.fromhex(data)
except ValueError:
pass
# Try base64
try:
return base64.b64decode(data)
except Exception:
pass
raise ValueError("Invalid hex or base64 encoding")
def get_tier_from_trp_balance(balance: int) -> str:
"""Determine tier based on TRP balance."""
if balance >= TRP_TIERS["elevated"]:
return "elevated"
elif balance >= TRP_TIERS["standard"]:
return "standard"
return "anonymous"
# ============ CIP-8 Signature Verification ============
def verify_cip8_signature(address: str, nonce: str, signature_hex: str, key_hex: str) -> bool:
"""
Verify a CIP-8 message signature.
CIP-8 spec: https://cips.cardano.org/cip/CIP-8
The signature is over a COSE_Sign1 structure containing the message.
We use pycardano for verification when available, with fallback to manual verification.
"""
try:
from pycardano import Address, VerificationKey
from pycardano.hash import blake2b_224
# Decode the key (should be a 32-byte Ed25519 public key)
key_bytes = bytes.fromhex(key_hex)
if not isinstance(key_bytes, bytes) or len(key_bytes) != 32:
logger.warning(f"CIP-8 verification failed: invalid key length {len(key_bytes) if isinstance(key_bytes, bytes) else 'non-bytes'}")
return False
vkey = VerificationKey.from_primitive(key_bytes)
# Verify the public key hashes to the address
addr = Address.from_primitive(address)
# Get the payment credential hash from the address
if addr.payment_part is None:
logger.warning("Address has no payment part")
return False
expected_keyhash = addr.payment_part.payload
# Hash the verification key to get keyhash
actual_keyhash = blake2b_224(key_bytes)
if expected_keyhash != actual_keyhash:
logger.warning("Key hash mismatch")
return False
# Decode the COSE signature
signature_bytes = bytes.fromhex(signature_hex)
# CIP-8 uses COSE_Sign1 structure
# The signature is a CBOR array: [protected_headers, unprotected_headers, payload, signature]
cose_sign1 = cbor2.loads(signature_bytes)
if not isinstance(cose_sign1, (list, tuple)) or len(cose_sign1) != 4:
# Try unwrapping if it's tagged
if hasattr(cose_sign1, 'value'):
cose_sign1 = cose_sign1.value
if not isinstance(cose_sign1, (list, tuple)) or len(cose_sign1) != 4:
logger.warning("Invalid COSE_Sign1 structure")
return False
protected, unprotected, payload, sig = cose_sign1
if not isinstance(protected, bytes) or not isinstance(sig, bytes):
logger.warning("CIP-8 verification failed: invalid COSE_Sign1 element types")
return False
# Protected header must declare EdDSA (alg = -8).
try:
protected_decoded = cbor2.loads(protected)
except Exception:
logger.warning("CIP-8 verification failed: could not decode protected header")
return False
if not isinstance(protected_decoded, dict):
logger.warning("CIP-8 verification failed: protected header is not a map")
return False
alg = protected_decoded.get(1) # Key 1 = algorithm in COSE header map
if alg != -8: # -8 = EdDSA per COSE spec
logger.warning(f"CIP-8 verification failed: invalid algorithm {alg}, expected -8 (EdDSA)")
return False
# Empty payloads bypass the nonce check; reject.
if not payload:
logger.warning("CIP-8 verification rejected: empty payload")
return False
# The payload should contain our nonce
payload_decoded = payload if isinstance(payload, bytes) else bytes(payload)
# Payload might be hex-encoded nonce or raw bytes
try:
if payload_decoded.hex() != nonce and payload_decoded.decode('utf-8') != nonce:
logger.warning("Payload doesn't match nonce")
return False
except:
if payload_decoded.hex() != nonce:
logger.warning("Payload doesn't match nonce (hex check)")
return False
# Build the Sig_structure for verification
# Sig_structure = ["Signature1", protected, external_aad, payload]
external_aad = b""
sig_structure = cbor2.dumps(["Signature1", protected, external_aad, payload or b""])
# Verify the Ed25519 signature
from nacl.signing import VerifyKey
from nacl.exceptions import BadSignature
verify_key = VerifyKey(key_bytes)
try:
verify_key.verify(sig_structure, sig)
return True
except BadSignature:
logger.warning("Bad signature")
return False
except ImportError as e:
logger.error(f"Missing dependency for CIP-8 verification: {e}")
return False
except Exception as e:
logger.error(f"CIP-8 verification error: {e}")
return False
# ============ Lifespan ============
async def refresh_trp_tiers_task():
"""Background task: refresh TRP tiers for all gated keys every 10 minutes."""
while True:
try:
await asyncio.sleep(600) # Run every 10 minutes (was 3600)
await refresh_all_trp_tiers()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in TRP tier refresh task: {e}")
async def refresh_all_trp_tiers():
"""
Check all TRP-gated keys and update tiers if balance changed.
Uses a dedicated Redis set + a single batched Postgres query to avoid N+1.
"""
if not redis_client or not db_pool:
return
key_hashes = await redis_client.smembers("trp_gated_keys")
if not key_hashes:
return
# Collect all keys and their owners
keys_data = []
addresses = []
for key_hash in key_hashes:
data = await redis_client.hgetall(f"apikey:{key_hash}")
if not data:
# Key was deleted but still in set - clean up
await redis_client.srem("trp_gated_keys", key_hash)
continue
owner = data.get("owner")
if owner and owner.startswith("addr"):
keys_data.append({"key_hash": key_hash, "data": data, "owner": owner})
if owner not in addresses:
addresses.append(owner)
if not addresses:
return
# Batch all TRP balances in one Postgres call.
try:
async with db_pool.acquire() as conn:
results = await conn.fetch("""
SELECT txo.address, COALESCE(SUM(mto.quantity), 0) as quantity
FROM ma_tx_out mto
JOIN multi_asset ma ON ma.id = mto.ident
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = ANY($1::text[])
AND ma.policy = decode($2, 'hex')
AND txi.id IS NULL
GROUP BY txo.address
""", addresses, TRP_POLICY_ID)
# Build address -> balance map
balance_map = {r["address"]: int(r["quantity"]) for r in results}
except Exception as e:
logger.error(f"Error batch-querying TRP balances: {e}")
return
# Update tiers for each key
updated = 0
for key_info in keys_data:
owner = key_info["owner"]
data = key_info["data"]
key_hash = key_info["key_hash"]
trp_balance = balance_map.get(owner, 0)
new_tier = get_tier_from_trp_balance(trp_balance)
old_tier = data.get("tier", "anonymous")
if new_tier != old_tier:
await redis_client.hset(f"apikey:{key_hash}", mapping={
"tier": new_tier,
"trp_balance": str(trp_balance)
})
logger.info(f"Updated tier for {owner}: {old_tier} -> {new_tier}")
updated += 1
if updated > 0:
logger.info(f"TRP tier refresh complete: {updated} keys updated")
async def get_trp_balance(address: str) -> int:
"""Get TRP token balance for an address from db-sync."""
async with db_pool.acquire() as conn:
result = await conn.fetchrow("""
SELECT COALESCE(SUM(mto.quantity), 0) as quantity
FROM ma_tx_out mto
JOIN multi_asset ma ON ma.id = mto.ident
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = $1
AND ma.policy = decode($2, 'hex')
AND txi.id IS NULL
""", address, TRP_POLICY_ID)
return int(result["quantity"]) if result else 0
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage database and redis connections."""
global db_pool, redis_client
# Connect to PostgreSQL
logger.info(f"Connecting to PostgreSQL at {DB_HOST}:{DB_PORT}/{DB_NAME}")
db_pool = await asyncpg.create_pool(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASS,
min_size=2,
max_size=10
)
# Connect to Redis
logger.info(f"Connecting to Redis at {REDIS_HOST}:{REDIS_PORT}")
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
# Check node socket
if os.path.exists(CARDANO_NODE_SOCKET_PATH):
logger.info(f"Cardano node socket found at {CARDANO_NODE_SOCKET_PATH}")
else:
logger.warning(f"Cardano node socket NOT found at {CARDANO_NODE_SOCKET_PATH} - node endpoints will fail")
# Start background task for TRP tier refresh
tier_refresh_task = asyncio.create_task(refresh_trp_tiers_task())
logger.info("Cardano API started successfully")
yield
# Cleanup
tier_refresh_task.cancel()
try:
await tier_refresh_task
except asyncio.CancelledError:
pass
if db_pool:
await db_pool.close()
if redis_client:
await redis_client.close()
app = FastAPI(
title="Cardano Chain Data API",
description="REST API for querying Cardano blockchain data via db-sync and cardano-node",
version="2.3.0",
lifespan=lifespan
)
# ============ Request Body Size Limit ============
class LimitBodySizeMiddleware(BaseHTTPMiddleware):
"""
Cap /v1/tx/submit body size. Cardano max tx is ~16 KB; cap at 64 KB.
Reads the actual body stream so chunked transfer / missing Content-Length
can't bypass the limit.
"""
MAX_TX_SIZE = 65536
async def dispatch(self, request: Request, call_next):
if request.url.path == "/v1/tx/submit":
content_length = request.headers.get("content-length")
if content_length and int(content_length) > self.MAX_TX_SIZE:
return JSONResponse(
status_code=413,
content={"error": "payload_too_large", "message": "Transaction exceeds maximum size of 64KB"}
)
# Actually read and limit the body regardless of Content-Length header
# This catches chunked transfer encoding and missing Content-Length
body = await request.body()
if len(body) > self.MAX_TX_SIZE:
return JSONResponse(
status_code=413,
content={"error": "payload_too_large", "message": "Transaction exceeds maximum size of 64KB"}
)
# Cache the body so the route handler can read it (body can only be read once)
async def receive():
return {"type": "http.request", "body": body, "more_body": False}
request._receive = receive
return await call_next(request)
app.add_middleware(LimitBodySizeMiddleware)
# ============ Exception Handlers ============
@app.exception_handler(UndefinedTableError)
async def handle_undefined_table(request: Request, exc: UndefinedTableError):
"""Handle missing tables (db-sync still initializing)."""
logger.warning(f"Database not ready: {exc}")
return JSONResponse(
status_code=503,
content={"error": "db_sync_not_ready", "message": "Database sync in progress. Tables not yet created."}
)
@app.exception_handler(PostgresError)
async def handle_postgres_error(request: Request, exc: PostgresError):
"""Handle general postgres errors. Don't leak internal detail to the caller."""
logger.error(f"Database error: {exc}")
return JSONResponse(
status_code=503,
content={"error": "database_error", "message": "Internal database error"}
)
# ============ Models ============
class APIKeyCreate(BaseModel):
label: str
tier: str = "standard" # standard or elevated
owner: Optional[str] = None # Cardano address for future TRP verification
trp_balance: Optional[int] = 0 # For future TRP-gating
class APIKeyResponse(BaseModel):
key: str
label: str
tier: str
owner: Optional[str]
created_at: str
class AuthChallengeRequest(BaseModel):
address: str = Field(..., description="Cardano address (bech32)")
class AuthChallengeResponse(BaseModel):
nonce: str
expires_at: str
class AuthVerifyRequest(BaseModel):
address: str = Field(..., description="Cardano address (bech32)")
nonce: str = Field(..., description="Nonce from challenge")
signature: str = Field(..., description="CIP-8 signature (hex)")
key: str = Field(..., description="Public key used for signing (hex)")
class AuthVerifyResponse(BaseModel):
api_key: str
tier: str
trp_balance: int
class TxSubmitRequest(BaseModel):
tx: str = Field(..., description="Signed transaction in hex or base64 CBOR encoding")
class TxSubmitResponse(BaseModel):
tx_hash: str
message: str
# ============ Helpers ============
async def get_api_key_info(key: str) -> Optional[dict]:
"""
Look up an API key by sha256 hash. Enforces TRP-gated key expiry —
expired keys are deleted on read and treated as missing.
"""
if not redis_client:
return None
key_hash = hash_api_key(key)
data = await redis_client.hgetall(f"apikey:{key_hash}")
if not data:
return None
expires_at = data.get("expires_at")
if expires_at:
try:
expiry_time = datetime.fromisoformat(expires_at.replace('Z', '+00:00'))
if datetime.now(timezone.utc) > expiry_time:
logger.info(f"API key expired for owner {data.get('owner', 'unknown')}")
await redis_client.delete(f"apikey:{key_hash}")
await redis_client.srem("trp_gated_keys", key_hash)
return None
except Exception as e:
logger.error(f"Error parsing expires_at: {e}")
return data
async def check_rate_limit(identifier: str, tier: str, limit_type: str = "general") -> tuple[bool, int]:
"""Check rate limit. Returns (allowed, retry_after_seconds)."""
if tier == "master":
return True, 0
limits = TX_SUBMIT_LIMITS if limit_type == "tx_submit" else RATE_LIMITS
limit = limits.get(tier, limits.get("anonymous", 0))
if limit == 0:
return False, 0 # Blocked entirely
key = f"ratelimit:{limit_type}:{identifier}:{int(time.time() // 60)}"
current = await redis_client.incr(key)
if current == 1:
await redis_client.expire(key, 60)
if current > limit:
return False, 60 - (int(time.time()) % 60)
return True, 0
async def get_cached(cache_key: str) -> Optional[dict]:
"""Get cached response."""
data = await redis_client.get(f"cache:{cache_key}")
if data:
return json.loads(data)
return None
async def set_cached(cache_key: str, data: dict, ttl: int):
"""Cache a response."""
await redis_client.setex(f"cache:{cache_key}", ttl, json.dumps(data))
def get_client_ip(request: Request) -> str:
"""Extract client IP. X-Forwarded-For is only honoured from TRUSTED_PROXIES."""
client_host = request.client.host if request.client else "unknown"
# Only trust X-Forwarded-For if connecting IP is a trusted proxy
if client_host in TRUSTED_PROXIES:
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return client_host
# ============ Auth Dependency ============
async def get_auth_context(
request: Request,
x_api_key: Optional[str] = Header(None),
api_key: Optional[str] = Query(None)
) -> dict:
"""Extract authentication context from request."""
key = x_api_key or api_key
client_ip = get_client_ip(request)
if key:
if key == API_MASTER_KEY:
return {"tier": "master", "identifier": "master", "label": "master"}
key_info = await get_api_key_info(key)
if key_info:
return {
"tier": key_info.get("tier", "standard"),
"identifier": hash_api_key(key),
"label": key_info.get("label", "unknown"),
"owner": key_info.get("owner"),
"raw_key": key # needed by the refresh endpoint
}
return {"tier": "anonymous", "identifier": client_ip, "label": None}
async def require_master_key(auth: dict = Depends(get_auth_context)):
"""Require master key for admin endpoints."""
if auth["tier"] != "master":
raise HTTPException(status_code=403, detail="Master key required")
return auth
async def require_standard_tier(auth: dict = Depends(get_auth_context)):
"""Standard tier or higher — node read endpoints. Anonymous = db-sync only."""
if auth["tier"] == "anonymous":
raise HTTPException(
status_code=403,
detail={
"error": "tier_required",
"message": "Node access requires standard tier or higher (50+ TRP)",
"required_tier": "standard",
"current_tier": auth["tier"]
}
)
return auth
async def require_elevated_tier(auth: dict = Depends(get_auth_context)):
"""Elevated tier — tx submission. Standard tier is read-only on the node."""
if auth["tier"] not in ("elevated", "master"):
raise HTTPException(
status_code=403,
detail={
"error": "tier_required",
"message": "Transaction submission requires elevated tier (500+ TRP)",
"required_tier": "elevated",
"current_tier": auth["tier"]
}
)
return auth
# ============ Middleware ============
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
"""Rate limiting and request logging middleware."""
start_time = time.time()
# Skip rate limiting for health check
if request.url.path == "/health":
return await call_next(request)
# Get auth context
x_api_key = request.headers.get("X-API-Key")
api_key = request.query_params.get("api_key")
key = x_api_key or api_key
client_ip = get_client_ip(request)
tier = "anonymous"
identifier = client_ip
label = None
if key:
if key == API_MASTER_KEY:
tier = "master"
identifier = "master"
label = "master"
else:
key_info = await get_api_key_info(key)
if key_info:
tier = key_info.get("tier", "standard")
identifier = hash_api_key(key)
label = key_info.get("label", "unknown")
# Check rate limit (general)
allowed, retry_after = await check_rate_limit(identifier, tier)
if not allowed:
logger.warning(f"Rate limit exceeded: {identifier[:16]}... ({tier})")
return JSONResponse(
status_code=429,
content={"error": "rate_limit_exceeded", "retry_after": retry_after}
)
# Process request
response = await call_next(request)
# Log request (don't log full key hash for security)
elapsed = int((time.time() - start_time) * 1000)
log_id = label or client_ip
logger.info(f"{request.method} {request.url.path} | {log_id} | {elapsed}ms | {response.status_code}")
return response
# ============ Health & Sync ============
@app.get("/health")
async def health_check():
"""Basic health check."""
return {"status": "ok"}
@app.get("/v1/sync/status")
async def get_sync_status(auth: dict = Depends(get_auth_context)):
"""Get db-sync synchronization status."""
cache_key = "sync_status"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
# Get latest block in db-sync
db_tip = await conn.fetchrow("""
SELECT block_no, epoch_no, slot_no, time
FROM block ORDER BY id DESC LIMIT 1
""")
# Get meta info for sync percentage
meta = await conn.fetchrow("SELECT * FROM meta LIMIT 1")
if not db_tip:
return {"error": "db_sync_not_ready", "message": "No blocks synced yet"}
# Check node tip for comparison
node_tip = None
success, stdout, stderr = run_cardano_cli(["query", "tip", "--mainnet"])
if success:
try:
node_tip = json.loads(stdout)
except:
pass
result = {
"db_sync_tip": db_tip["block_no"],
"epoch_no": db_tip["epoch_no"],
"slot_no": db_tip["slot_no"],
"last_block_time": db_tip["time"].isoformat() if db_tip["time"] else None,
"is_syncing": True, # Assume syncing unless we have better info
"node_tip": node_tip.get("block") if node_tip else None,
"node_connected": node_tip is not None
}
# If we have both tips, calculate sync percentage
if node_tip and db_tip["block_no"]:
node_block = node_tip.get("block", 0)
if node_block > 0:
sync_pct = (db_tip["block_no"] / node_block) * 100
result["sync_percentage"] = round(sync_pct, 2)
result["is_syncing"] = sync_pct < 99.9
await set_cached(cache_key, result, CACHE_TTLS["sync_status"])
return result
# ============ Node Integration Endpoints ============
@app.get("/v1/address/{address}/utxos")
async def get_address_utxos(address: str, auth: dict = Depends(require_standard_tier)):
"""
Query UTxOs for an address directly from the node.
Faster than db-sync for current unspent outputs.
Standard tier (50+ TRP) or higher. Anonymous tier should use
/v1/address/{address}/balance (db-sync) instead.
"""
if not validate_address(address):
raise HTTPException(
status_code=400,
detail={"error": "invalid_address", "message": "Invalid Cardano address format"}
)
cache_key = f"utxos_{address}"
cached = await get_cached(cache_key)
if cached:
return cached
# Query node directly using cardano-cli
success, stdout, stderr = run_cardano_cli([
"query", "utxo",
"--address", address,
"--mainnet",
"--output-json"
])
if not success:
if "Network.Socket.connect" in stderr or "does not exist" in stderr:
raise HTTPException(
status_code=503,
detail={"error": "node_unavailable", "message": "Cardano node not available"}
)
logger.error(f"cardano-cli utxo query failed: {stderr}")
raise HTTPException(
status_code=500,
detail={"error": "node_error", "message": "Node command failed"}
)
try:
utxo_data = json.loads(stdout)
except json.JSONDecodeError:
raise HTTPException(
status_code=500,
detail={"error": "parse_error", "message": "Failed to parse node response"}
)
# Transform to our API format
utxos = []
total_lovelace = 0
for utxo_id, utxo_info in utxo_data.items():
tx_hash, tx_index = utxo_id.split("#")
lovelace = utxo_info.get("value", {}).get("lovelace", 0)
total_lovelace += lovelace
tokens = []
for policy_id, assets in utxo_info.get("value", {}).items():
if policy_id == "lovelace":
continue
if isinstance(assets, dict):
for asset_name, quantity in assets.items():
tokens.append({
"policy_id": policy_id,
"asset_name": asset_name,
"quantity": quantity
})
utxos.append({
"tx_hash": tx_hash,
"tx_index": int(tx_index),
"lovelace": lovelace,
"tokens": tokens
})
result = {
"address": address,
"utxo_count": len(utxos),
"total_lovelace": total_lovelace,
"total_ada": total_lovelace / 1_000_000,
"utxos": utxos
}
await set_cached(cache_key, result, CACHE_TTLS["utxos"])
return result
@app.post("/v1/tx/submit")
async def submit_transaction(
request: TxSubmitRequest,
auth: dict = Depends(require_elevated_tier)
):
"""
Submit a signed transaction to the network.
Accepts hex or base64 encoded CBOR transaction.
Elevated tier (500+ TRP) or master key. Standard tier is node-read-only.
"""
# Check tx submission rate limit (elevated tier still has limits)
allowed, retry_after = await check_rate_limit(auth["identifier"], auth["tier"], "tx_submit")
if not allowed:
raise HTTPException(
status_code=429,
detail={"error": "rate_limit_exceeded", "retry_after": retry_after}
)
# Decode the transaction
try:
tx_bytes = decode_hex_or_base64(request.tx)
except ValueError as e:
raise HTTPException(
status_code=400,
detail={"error": "invalid_encoding", "message": str(e)}
)
# Validate CBOR before shelling out.
try:
tx_cbor = cbor2.loads(tx_bytes)
except Exception:
raise HTTPException(
status_code=400,
detail={"error": "invalid_tx", "message": "Transaction is not valid CBOR"}
)
# Cardano tx hash is blake2b of the tx body (index 0), not the full tx.
try:
if isinstance(tx_cbor, (list, tuple)) and len(tx_cbor) > 0:
tx_body_cbor = cbor2.dumps(tx_cbor[0])
tx_hash = hashlib.blake2b(tx_body_cbor, digest_size=32).hexdigest()
else:
# Fallback if structure is unexpected
tx_hash = hashlib.blake2b(tx_bytes, digest_size=32).hexdigest()
except Exception as e:
logger.error(f"Error calculating tx hash: {e}")
tx_hash = hashlib.blake2b(tx_bytes, digest_size=32).hexdigest()
# Write to temp file for cardano-cli
with tempfile.NamedTemporaryFile(suffix=".signed", delete=False) as f:
f.write(tx_bytes)
tx_file = f.name
try:
# Submit via cardano-cli
success, stdout, stderr = run_cardano_cli([
"transaction", "submit",
"--tx-file", tx_file,
"--mainnet"
], timeout=60)
if not success:
if "Network.Socket.connect" in stderr or "does not exist" in stderr:
raise HTTPException(
status_code=503,
detail={"error": "node_unavailable", "message": "Cardano node not available"}
)
# Parse common errors (these are safe to expose)
if "OutsideValidityIntervalUTxO" in stderr:
raise HTTPException(
status_code=400,
detail={"error": "validity_interval", "message": "Transaction validity interval expired"}
)
if "BadInputsUTxO" in stderr:
raise HTTPException(
status_code=400,
detail={"error": "bad_inputs", "message": "One or more inputs already spent"}
)
if "ValueNotConservedUTxO" in stderr:
raise HTTPException(
status_code=400,
detail={"error": "value_not_conserved", "message": "Input/output value mismatch"}
)
logger.error(f"tx submit failed: {stderr}")
raise HTTPException(
status_code=400,
detail={"error": "submit_failed", "message": "Node command failed"}
)
logger.info(f"Transaction submitted: {tx_hash[:16]}... by {auth['identifier'][:16]}...")
return TxSubmitResponse(
tx_hash=tx_hash,
message="Transaction successfully submitted"
)
finally:
# Clean up temp file
try:
os.unlink(tx_file)
except:
pass
@app.get("/v1/protocol-params")
async def get_protocol_params(auth: dict = Depends(require_standard_tier)):
"""
Get current epoch protocol parameters from the node. Cached 5 min.
Standard tier (50+ TRP) or higher.
"""
global protocol_params_cache
# Lock to prevent cache stampede on first-load.
async with _params_lock:
# Check cache inside the lock
if protocol_params_cache["data"] and protocol_params_cache["expires"] > time.time():
return protocol_params_cache["data"]
# Query from node
success, stdout, stderr = run_cardano_cli([
"query", "protocol-parameters",
"--mainnet"
])
if not success:
if "Network.Socket.connect" in stderr or "does not exist" in stderr:
raise HTTPException(
status_code=503,
detail={"error": "node_unavailable", "message": "Cardano node not available"}
)
logger.error(f"protocol-params query failed: {stderr}")
raise HTTPException(
status_code=500,
detail={"error": "node_error", "message": "Node command failed"}
)
try:
params = json.loads(stdout)
except json.JSONDecodeError:
raise HTTPException(
status_code=500,
detail={"error": "parse_error", "message": "Failed to parse protocol parameters"}
)
# Cache for 5 minutes
protocol_params_cache = {
"data": params,
"expires": time.time() + CACHE_TTLS["protocol_params"]
}
return params
# ============ Auth Endpoints (TRP-Gated) ============
@app.post("/v1/auth/challenge")
async def create_auth_challenge(request: AuthChallengeRequest, req: Request):
"""
Create a challenge nonce for wallet signature verification.
The nonce must be signed with CIP-8 and submitted to /v1/auth/verify.
Per-address rate-limited to prevent challenge flood / Redis memory blow-up.
"""
address = request.address.strip()
if not validate_address(address):
raise HTTPException(
status_code=400,
detail={"error": "invalid_address", "message": "Invalid Cardano address format"}
)
# 5 challenges/min per address.
challenge_rate_key = f"challenge_rate:{address}:{int(time.time() // 60)}"
current_count = await redis_client.incr(challenge_rate_key)
if current_count == 1:
await redis_client.expire(challenge_rate_key, 60)
if current_count > 5:
raise HTTPException(
status_code=429,
detail={"error": "too_many_challenges", "message": "Too many challenge requests. Try again in a minute."}
)
# And cap outstanding (unverified) challenges per address — prevents accumulation across minutes.
outstanding_key = f"challenge_count:{address}"
outstanding = await redis_client.incr(outstanding_key)
if outstanding == 1:
await redis_client.expire(outstanding_key, 300) # 5 min TTL matching nonce TTL
if outstanding > 10:
# Decrement since we're not actually issuing a challenge
await redis_client.decr(outstanding_key)
raise HTTPException(
status_code=429,
detail={"error": "too_many_challenges", "message": "Too many outstanding challenges for this address."}
)
# Generate nonce
nonce = secrets.token_hex(32)
expires_at = datetime.now(timezone.utc) + timedelta(minutes=5)
# Store in Redis with TTL
await redis_client.setex(
f"auth_challenge:{address}:{nonce}",
300, # 5 minutes TTL
json.dumps({"address": address, "created_at": datetime.now(timezone.utc).isoformat()})
)
return AuthChallengeResponse(
nonce=nonce,
expires_at=expires_at.isoformat()
)
@app.post("/v1/auth/verify")
async def verify_auth(request: AuthVerifyRequest):
"""
Verify CIP-8 signature and issue API key based on TRP balance.
Tiers:
- 0 TRP: anonymous (no key issued)
- 50+ TRP: standard (100 req/min)
- 500+ TRP: elevated (1000 req/min)
TRP-gated keys expire after 48 hours and must be re-authenticated.
"""
address = request.address
nonce = request.nonce
if not validate_address(address):
raise HTTPException(
status_code=400,
detail={"error": "invalid_address", "message": "Invalid Cardano address format"}
)
# Atomic GET+DEL the nonce — closes the verify-twice race window.
challenge_key = f"auth_challenge:{address}:{nonce}"
async with redis_client.pipeline(transaction=True) as pipe:
await pipe.get(challenge_key)
await pipe.delete(challenge_key)
challenge_data, deleted = await pipe.execute()
if not challenge_data or not deleted:
raise HTTPException(
status_code=400,
detail={"error": "invalid_nonce", "message": "Nonce expired or already used"}
)
# Nonce consumed — drop one from the outstanding counter.
outstanding_key = f"challenge_count:{address}"
await redis_client.decr(outstanding_key)
# Verify CIP-8 signature
if not verify_cip8_signature(address, nonce, request.signature, request.key):
raise HTTPException(
status_code=401,
detail={"error": "invalid_signature", "message": "Signature verification failed"}
)
# Get TRP balance
trp_balance = await get_trp_balance(address)
tier = get_tier_from_trp_balance(trp_balance)
# Don't issue key for anonymous tier
if tier == "anonymous":
raise HTTPException(
status_code=403,
detail={
"error": "insufficient_trp",
"message": f"Minimum {TRP_TIERS['standard']} TRP required for API key",
"trp_balance": trp_balance,
"required": TRP_TIERS["standard"]
}
)
# Generate API key
new_key = f"capi_{secrets.token_hex(24)}"
# TRP-gated keys expire 48h after issue — re-auth via /v1/auth/refresh.
expires_at = datetime.now(timezone.utc) + timedelta(hours=TRP_KEY_EXPIRY_HOURS)
key_hash = hash_api_key(new_key)
await redis_client.hset(f"apikey:{key_hash}", mapping={
"label": f"TRP-gated:{address[:20]}...",
"tier": tier,
"owner": address,
"trp_balance": str(trp_balance),
"created_at": datetime.now(timezone.utc).isoformat(),
"expires_at": expires_at.isoformat()
})
# Track TRP-gated keys in a set — the refresh task batches a single
# Postgres query over the owner addresses instead of N+1.
await redis_client.sadd("trp_gated_keys", key_hash)
logger.info(f"Issued {tier} API key for {address[:20]}... (TRP: {trp_balance}, expires: {expires_at.isoformat()})")
# Return the raw key to the user (only time it's exposed)
return AuthVerifyResponse(
api_key=new_key,
tier=tier,
trp_balance=trp_balance
)
@app.post("/v1/auth/refresh")
async def refresh_auth(
x_api_key: str = Header(..., description="API key to refresh"),
auth: dict = Depends(get_auth_context)
):
"""
Re-check TRP balance and upgrade/downgrade tier for an existing key.
Self-service only. The API key in the header is both the authentication
AND the key being refreshed — you cannot use key A to refresh key B.
"""
if auth["tier"] == "anonymous":
raise HTTPException(
status_code=401,
detail={"error": "unauthorized", "message": "Valid API key required"}
)
if auth["tier"] == "master":
raise HTTPException(
status_code=400,
detail={"error": "invalid_request", "message": "Cannot refresh master key"}
)
owner = auth.get("owner")
if not owner:
raise HTTPException(
status_code=400,
detail={"error": "not_trp_gated", "message": "This key is not TRP-gated"}
)
# Get current TRP balance
trp_balance = await get_trp_balance(owner)
new_tier = get_tier_from_trp_balance(trp_balance)
old_tier = auth["tier"]
# Refresh resets the 48h expiry too.
expires_at = datetime.now(timezone.utc) + timedelta(hours=TRP_KEY_EXPIRY_HOURS)
key_hash = hash_api_key(x_api_key)
await redis_client.hset(f"apikey:{key_hash}", mapping={
"tier": new_tier,
"trp_balance": str(trp_balance),
"expires_at": expires_at.isoformat()
})
tier_changed = new_tier != old_tier
message = f"Tier {'upgraded' if new_tier > old_tier else 'downgraded'} from {old_tier} to {new_tier}" if tier_changed else "Tier unchanged"
return {
"api_key": x_api_key,
"tier": new_tier,
"previous_tier": old_tier,
"trp_balance": trp_balance,
"tier_changed": tier_changed,
"expires_at": expires_at.isoformat(),
"message": message
}
# ============ Block Endpoints ============
@app.get("/v1/block/latest")
async def get_latest_block(auth: dict = Depends(get_auth_context)):
"""Get the latest block."""
cache_key = "block_latest"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
block = await conn.fetchrow("""
SELECT b.block_no, b.epoch_no, b.slot_no,
encode(b.hash, 'hex') as hash,
b.tx_count, b.time
FROM block b
ORDER BY b.id DESC LIMIT 1
""")
if not block:
raise HTTPException(status_code=503, detail={"error": "db_sync_not_ready", "message": "No blocks synced yet"})
result = {
"block_no": block["block_no"],
"epoch_no": block["epoch_no"],
"slot_no": block["slot_no"],
"hash": block["hash"],
"tx_count": block["tx_count"],
"time": block["time"].isoformat() if block["time"] else None
}
await set_cached(cache_key, result, CACHE_TTLS["block_latest"])
return result
@app.get("/v1/block/{block_no}")
async def get_block(block_no: int, auth: dict = Depends(get_auth_context)):
"""Get a specific block by number."""
cache_key = f"block_{block_no}"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
block = await conn.fetchrow("""
SELECT b.block_no, b.epoch_no, b.slot_no,
encode(b.hash, 'hex') as hash,
b.tx_count, b.time
FROM block b
WHERE b.block_no = $1
""", block_no)
if not block:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Block {block_no} not found"})
result = {
"block_no": block["block_no"],
"epoch_no": block["epoch_no"],
"slot_no": block["slot_no"],
"hash": block["hash"],
"tx_count": block["tx_count"],
"time": block["time"].isoformat() if block["time"] else None
}
await set_cached(cache_key, result, CACHE_TTLS["tx_details"]) # Blocks are immutable
return result
# ============ Address Endpoints ============
@app.get("/v1/address/{address}/balance")
async def get_address_balance(address: str, auth: dict = Depends(get_auth_context)):
"""Get address balance including native tokens."""
# Validate address
if not validate_address(address):
raise HTTPException(
status_code=400,
detail={"error": "invalid_address", "message": "Invalid Cardano address format"}
)
cache_key = f"balance_{address}"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
# Get ADA balance from UTxOs
balance = await conn.fetchrow("""
SELECT COALESCE(SUM(value), 0) as lovelace
FROM tx_out txo
JOIN tx ON tx.id = txo.tx_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = $1 AND txi.id IS NULL
""", address)
# Get native tokens
tokens = await conn.fetch("""
SELECT
encode(ma.policy, 'hex') as policy_id,
encode(ma.name, 'hex') as asset_name_hex,
convert_from(ma.name, 'UTF8') as asset_name,
ma.fingerprint,
COALESCE(SUM(mto.quantity), 0) as quantity
FROM ma_tx_out mto
JOIN multi_asset ma ON ma.id = mto.ident
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = $1 AND txi.id IS NULL
GROUP BY ma.id, ma.policy, ma.name, ma.fingerprint
HAVING SUM(mto.quantity) > 0
""", address)
lovelace = int(balance["lovelace"]) if balance else 0
result = {
"address": address,
"lovelace": lovelace,
"ada": lovelace / 1_000_000,
"tokens": [
{
"policy_id": t["policy_id"],
"asset_name": t["asset_name"] or t["asset_name_hex"],
"asset_name_hex": t["asset_name_hex"],
"fingerprint": t["fingerprint"],
"quantity": int(t["quantity"])
}
for t in tokens
]
}
await set_cached(cache_key, result, CACHE_TTLS["balance"])
return result
@app.get("/v1/address/{address}/tokens")
async def get_address_tokens(
address: str,
page: int = Query(1, ge=1, le=10000, description="Page number (max 10000)"),
limit: int = Query(100, ge=1, le=1000, description="Results per page (max 1000)"),
auth: dict = Depends(get_auth_context)
):
"""Get native tokens held by an address. Paginated."""
# Validate address
if not validate_address(address):
raise HTTPException(
status_code=400,
detail={"error": "invalid_address", "message": "Invalid Cardano address format"}
)
offset = (page - 1) * limit
cache_key = f"tokens_{address}_{page}_{limit}"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
# Get total count for pagination info
count_result = await conn.fetchrow("""
SELECT COUNT(DISTINCT ma.id) as total
FROM ma_tx_out mto
JOIN multi_asset ma ON ma.id = mto.ident
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = $1 AND txi.id IS NULL
""", address)
total_count = count_result["total"] if count_result else 0
# LIMIT/OFFSET for pagination
tokens = await conn.fetch("""
SELECT
encode(ma.policy, 'hex') as policy_id,
encode(ma.name, 'hex') as asset_name_hex,
convert_from(ma.name, 'UTF8') as asset_name,
ma.fingerprint,
COALESCE(SUM(mto.quantity), 0) as quantity
FROM ma_tx_out mto
JOIN multi_asset ma ON ma.id = mto.ident
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = $1 AND txi.id IS NULL
GROUP BY ma.id, ma.policy, ma.name, ma.fingerprint
HAVING SUM(mto.quantity) > 0
ORDER BY quantity DESC
LIMIT $2 OFFSET $3
""", address, limit, offset)
result = {
"address": address,
"page": page,
"limit": limit,
"total_count": total_count,
"total_pages": (total_count + limit - 1) // limit if total_count > 0 else 0,
"tokens": [
{
"policy_id": t["policy_id"],
"asset_name": t["asset_name"] or t["asset_name_hex"],
"asset_name_hex": t["asset_name_hex"],
"fingerprint": t["fingerprint"],
"quantity": int(t["quantity"])
}
for t in tokens
]
}
await set_cached(cache_key, result, CACHE_TTLS["tokens"])
return result
@app.get("/v1/address/{address}/transactions")
async def get_address_transactions(
address: str,
page: int = Query(1, ge=1, le=10000, description="Page number (max 10000)"),
limit: int = Query(20, ge=1, le=100),
order: str = Query("desc", regex="^(asc|desc)$"),
auth: dict = Depends(get_auth_context)
):
"""Get transactions for an address."""
# Validate address
if not validate_address(address):
raise HTTPException(
status_code=400,
detail={"error": "invalid_address", "message": "Invalid Cardano address format"}
)
cache_key = f"txs_{address}_{page}_{limit}_{order}"
cached = await get_cached(cache_key)
if cached:
return cached
offset = (page - 1) * limit
order_dir = "DESC" if order == "desc" else "ASC"
async with db_pool.acquire() as conn:
# Count total transactions
count_result = await conn.fetchrow("""
SELECT COUNT(DISTINCT tx.id) as total
FROM tx
JOIN tx_out ON tx_out.tx_id = tx.id
WHERE tx_out.address = $1
UNION
SELECT COUNT(DISTINCT tx.id) as total
FROM tx
JOIN tx_in ON tx_in.tx_in_id = tx.id
JOIN tx_out ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index
WHERE tx_out.address = $1
""", address)
# Get transactions
txs = await conn.fetch(f"""
WITH addr_txs AS (
SELECT DISTINCT tx.id, tx.hash, tx.fee, b.block_no, b.time
FROM tx
JOIN block b ON b.id = tx.block_id
JOIN tx_out ON tx_out.tx_id = tx.id
WHERE tx_out.address = $1
UNION
SELECT DISTINCT tx.id, tx.hash, tx.fee, b.block_no, b.time
FROM tx
JOIN block b ON b.id = tx.block_id
JOIN tx_in ON tx_in.tx_in_id = tx.id
JOIN tx_out ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index
WHERE tx_out.address = $1
)
SELECT * FROM addr_txs
ORDER BY time {order_dir}
LIMIT $2 OFFSET $3
""", address, limit, offset)
result = {
"address": address,
"page": page,
"limit": limit,
"transactions": [
{
"tx_hash": tx["hash"].hex() if isinstance(tx["hash"], bytes) else tx["hash"],
"block_no": tx["block_no"],
"block_time": tx["time"].isoformat() if tx["time"] else None,
"fee": int(tx["fee"]) if tx["fee"] else 0
}
for tx in txs
]
}
await set_cached(cache_key, result, CACHE_TTLS["transactions"])
return result
# ============ Transaction Endpoints ============
@app.get("/v1/tx/{tx_hash}")
async def get_transaction(tx_hash: str, auth: dict = Depends(get_auth_context)):
"""Get transaction details by hash."""
# Validate tx hash
if not validate_tx_hash(tx_hash):
raise HTTPException(
status_code=400,
detail={"error": "invalid_tx_hash", "message": "Invalid transaction hash format (expected 64 hex chars)"}
)
cache_key = f"tx_{tx_hash}"
cached = await get_cached(cache_key)
if cached:
return cached
# Clean up hash (remove 0x prefix if present)
clean_hash = tx_hash.lower().replace("0x", "")
async with db_pool.acquire() as conn:
tx = await conn.fetchrow("""
SELECT tx.id, encode(tx.hash, 'hex') as hash, tx.fee,
b.block_no, b.time, b.epoch_no
FROM tx
JOIN block b ON b.id = tx.block_id
WHERE tx.hash = decode($1, 'hex')
""", clean_hash)
if not tx:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Transaction {tx_hash} not found"})
# Get inputs
inputs = await conn.fetch("""
SELECT txo.address, txo.value,
encode(tx_src.hash, 'hex') as source_tx_hash,
txo.index as source_tx_index
FROM tx_in txi
JOIN tx_out txo ON txo.tx_id = txi.tx_out_id AND txo.index = txi.tx_out_index
JOIN tx tx_src ON tx_src.id = txi.tx_out_id
WHERE txi.tx_in_id = $1
""", tx["id"])
# Get outputs
outputs = await conn.fetch("""
SELECT address, value, index
FROM tx_out
WHERE tx_id = $1
ORDER BY index
""", tx["id"])
# Get metadata
metadata = await conn.fetch("""
SELECT key, json
FROM tx_metadata
WHERE tx_id = $1
""", tx["id"])
result = {
"tx_hash": tx["hash"],
"block_no": tx["block_no"],
"block_time": tx["time"].isoformat() if tx["time"] else None,
"epoch_no": tx["epoch_no"],
"fee": int(tx["fee"]) if tx["fee"] else 0,
"inputs": [
{
"address": i["address"],
"value": int(i["value"]),
"source_tx_hash": i["source_tx_hash"],
"source_tx_index": i["source_tx_index"]
}
for i in inputs
],
"outputs": [
{
"address": o["address"],
"value": int(o["value"]),
"index": o["index"]
}
for o in outputs
],
"metadata": {str(m["key"]): m["json"] for m in metadata} if metadata else None
}
await set_cached(cache_key, result, CACHE_TTLS["tx_details"])
return result
# ============ Asset Endpoints ============
@app.get("/v1/asset/{policy_id}/info")
async def get_asset_info(
policy_id: str,
page: int = Query(1, ge=1, le=10000, description="Page number (max 10000)"),
limit: int = Query(100, ge=1, le=500, description="Results per page (max 500)"),
auth: dict = Depends(get_auth_context)
):
"""Get info about all assets under a policy ID. Paginated."""
# Validate policy ID
if not validate_policy_id(policy_id):
raise HTTPException(
status_code=400,
detail={"error": "invalid_policy_id", "message": "Invalid policy ID format (expected 56 hex chars)"}
)
offset = (page - 1) * limit
cache_key = f"asset_info_{policy_id}_{page}_{limit}"
cached = await get_cached(cache_key)
if cached:
return cached
clean_policy = policy_id.lower().replace("0x", "")
async with db_pool.acquire() as conn:
# Get total count for pagination
count_result = await conn.fetchrow("""
SELECT COUNT(*) as total
FROM multi_asset ma
WHERE ma.policy = decode($1, 'hex')
""", clean_policy)
total_count = count_result["total"] if count_result else 0
# LIMIT/OFFSET for pagination
assets = await conn.fetch("""
SELECT
encode(ma.name, 'hex') as asset_name_hex,
convert_from(ma.name, 'UTF8') as asset_name,
ma.fingerprint,
(SELECT COALESCE(SUM(quantity), 0) FROM ma_tx_mint WHERE ident = ma.id) as total_minted,
(SELECT COUNT(*) FROM ma_tx_mint WHERE ident = ma.id AND quantity > 0) as mint_count
FROM multi_asset ma
WHERE ma.policy = decode($1, 'hex')
ORDER BY ma.id
LIMIT $2 OFFSET $3
""", clean_policy, limit, offset)
if not assets and page == 1:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Policy {policy_id} not found"})
result = {
"policy_id": policy_id,
"page": page,
"limit": limit,
"total_count": total_count,
"total_pages": (total_count + limit - 1) // limit if total_count > 0 else 0,
"assets": [
{
"asset_name": a["asset_name"] or a["asset_name_hex"],
"asset_name_hex": a["asset_name_hex"],
"fingerprint": a["fingerprint"],
"total_supply": int(a["total_minted"]) if a["total_minted"] else 0,
"mint_count": a["mint_count"]
}
for a in assets
]
}
await set_cached(cache_key, result, CACHE_TTLS["asset_info"])
return result
@app.get("/v1/asset/{policy_id}/{asset_name}/holders")
async def get_asset_holders(
policy_id: str,
asset_name: str,
limit: int = Query(20, ge=1, le=100),
auth: dict = Depends(get_auth_context)
):
"""Get top holders of a specific asset."""
# Validate policy ID
if not validate_policy_id(policy_id):
raise HTTPException(
status_code=400,
detail={"error": "invalid_policy_id", "message": "Invalid policy ID format (expected 56 hex chars)"}
)
cache_key = f"holders_{policy_id}_{asset_name}_{limit}"
cached = await get_cached(cache_key)
if cached:
return cached
clean_policy = policy_id.lower().replace("0x", "")
async with db_pool.acquire() as conn:
# Get asset ID
asset = await conn.fetchrow("""
SELECT id FROM multi_asset
WHERE policy = decode($1, 'hex') AND name = decode($2, 'hex')
""", clean_policy, asset_name)
if not asset:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": "Asset not found"})
# Get holders
holders = await conn.fetch("""
SELECT txo.address, SUM(mto.quantity) as quantity
FROM ma_tx_out mto
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE mto.ident = $1 AND txi.id IS NULL
GROUP BY txo.address
HAVING SUM(mto.quantity) > 0
ORDER BY quantity DESC
LIMIT $2
""", asset["id"], limit)
# Holder count via subquery — naive COUNT DISTINCT + GROUP BY misses
# the HAVING SUM > 0 filter and over-counts addresses that net to zero.
holder_count = await conn.fetchrow("""
SELECT COUNT(*) as count FROM (
SELECT txo.address
FROM ma_tx_out mto
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE mto.ident = $1 AND txi.id IS NULL
GROUP BY txo.address
HAVING SUM(mto.quantity) > 0
) sub
""", asset["id"])
result = {
"policy_id": policy_id,
"asset_name": asset_name,
"holder_count": holder_count["count"] if holder_count else 0,
"holders": [
{"address": h["address"], "quantity": int(h["quantity"])}
for h in holders
]
}
await set_cached(cache_key, result, CACHE_TTLS["asset_info"])
return result
# ============ Pool Endpoints ============
@app.get("/v1/pool/{pool_id}/info")
async def get_pool_info(pool_id: str, auth: dict = Depends(get_auth_context)):
"""Get stake pool information."""
cache_key = f"pool_{pool_id}"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
# Try to find pool by bech32 id or hash
pool = await conn.fetchrow("""
SELECT
ph.view as pool_id,
encode(ph.hash_raw, 'hex') as pool_hash,
pod.ticker_name as ticker,
pod.json->>'name' as name,
pod.json->>'description' as description,
pu.pledge,
pu.margin,
(SELECT COALESCE(SUM(amount), 0) FROM epoch_stake
WHERE pool_id = ph.id AND epoch_no = (SELECT MAX(epoch_no) FROM epoch_stake WHERE pool_id = ph.id)) as active_stake,
(SELECT COUNT(*) FROM block WHERE slot_leader_id IN (SELECT id FROM slot_leader WHERE pool_hash_id = ph.id)) as block_count
FROM pool_hash ph
LEFT JOIN pool_offline_data pod ON pod.pool_id = ph.id
LEFT JOIN pool_update pu ON pu.hash_id = ph.id
AND pu.registered_tx_id = (SELECT MAX(registered_tx_id) FROM pool_update WHERE hash_id = ph.id)
WHERE ph.view = $1 OR encode(ph.hash_raw, 'hex') = $1
""", pool_id.lower())
if not pool:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Pool {pool_id} not found"})
result = {
"pool_id": pool["pool_id"],
"pool_hash": pool["pool_hash"],
"ticker": pool["ticker"],
"name": pool["name"],
"description": pool["description"],
"pledge": int(pool["pledge"]) if pool["pledge"] else 0,
"margin": float(pool["margin"]) if pool["margin"] else 0,
"active_stake": int(pool["active_stake"]) if pool["active_stake"] else 0,
"block_count": pool["block_count"] or 0
}
await set_cached(cache_key, result, CACHE_TTLS["pool_info"])
return result
# ============ Admin Endpoints ============
@app.post("/admin/keys")
async def create_api_key(key_data: APIKeyCreate, auth: dict = Depends(require_master_key)):
"""Create a new API key (admin-created keys don't expire unless explicitly set)."""
new_key = f"capi_{secrets.token_hex(24)}"
key_hash = hash_api_key(new_key)
# Admin-created keys don't expire by default — only TRP-gated keys do.
await redis_client.hset(f"apikey:{key_hash}", mapping={
"label": key_data.label,
"tier": key_data.tier,
"owner": key_data.owner or "",
"trp_balance": str(key_data.trp_balance or 0),
"created_at": datetime.now(timezone.utc).isoformat()
# No expires_at for admin-created keys
})
return APIKeyResponse(
key=new_key, # Return raw key only once
label=key_data.label,
tier=key_data.tier,
owner=key_data.owner,
created_at=datetime.now(timezone.utc).isoformat()
)
@app.delete("/admin/keys/{key}")
async def revoke_api_key(key: str, auth: dict = Depends(require_master_key)):
"""Revoke an API key."""
key_hash = hash_api_key(key)
deleted = await redis_client.delete(f"apikey:{key_hash}")
if not deleted:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": "API key not found"})
await redis_client.srem("trp_gated_keys", key_hash)
return {"status": "revoked", "key": key[:16] + "..."}
@app.get("/admin/keys")
async def list_api_keys(auth: dict = Depends(require_master_key)):
"""List all API keys (shows metadata, not raw keys since we only store hashes)."""
keys = []
cursor = 0
while True:
cursor, found_keys = await redis_client.scan(cursor, match="apikey:*", count=100)
for key in found_keys:
data = await redis_client.hgetall(key)
if data:
# Key is stored as hash, so we show the hash (last 8 chars) for identification
key_hash = key.replace("apikey:", "")
keys.append({
"key_hash_suffix": f"...{key_hash[-8:]}",
"label": data.get("label"),
"tier": data.get("tier"),
"owner": data.get("owner") or None,
"trp_balance": int(data.get("trp_balance", 0)),
"created_at": data.get("created_at"),
"expires_at": data.get("expires_at")
})
if cursor == 0:
break
return {"keys": keys, "count": len(keys)}
@app.post("/admin/refresh-tiers")
async def admin_refresh_tiers(auth: dict = Depends(require_master_key)):
"""Manually trigger TRP tier refresh for all keys."""
await refresh_all_trp_tiers()
return {"status": "ok", "message": "TRP tier refresh completed"}
@app.get("/admin/stats")
async def get_admin_stats(auth: dict = Depends(require_master_key)):
"""Get API usage statistics."""
# Count rate limit keys (active users in current minute)
current_minute = int(time.time() // 60)
active_users = 0
cursor = 0
while True:
cursor, keys = await redis_client.scan(cursor, match=f"ratelimit:general:*:{current_minute}", count=100)
active_users += len(keys)
if cursor == 0:
break
# Count API keys by tier
api_key_counts = {"standard": 0, "elevated": 0, "total": 0}
cursor = 0
while True:
cursor, keys = await redis_client.scan(cursor, match="apikey:*", count=100)
for key in keys:
tier = await redis_client.hget(key, "tier")
api_key_counts["total"] += 1
if tier in api_key_counts:
api_key_counts[tier] += 1
if cursor == 0:
break
# Get db-sync status
async with db_pool.acquire() as conn:
latest_block = await conn.fetchrow("SELECT block_no, time FROM block ORDER BY id DESC LIMIT 1")
tx_count = await conn.fetchrow("SELECT COUNT(*) as count FROM tx")
# Check node status
node_connected = os.path.exists(CARDANO_NODE_SOCKET_PATH)
return {
"active_users_this_minute": active_users,
"api_keys": api_key_counts,
"db_sync_block": latest_block["block_no"] if latest_block else 0,
"db_sync_time": latest_block["time"].isoformat() if latest_block and latest_block["time"] else None,
"total_transactions_synced": tx_count["count"] if tx_count else 0,
"node_socket_exists": node_connected
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8765)