""" Cardano Chain Data REST API Sits in front of cardano-db-sync PostgreSQL database. Includes node integration for UTxO queries, tx submission, and protocol params. TRP-gated permissionless API keys with CIP-8 wallet signature verification. Known policy IDs: - TRP: 9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05 - MAP: 24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c Access Tiers (strictly enforced): - Anonymous (0 TRP, 20 req/min): db-sync read-only ONLY. Balance, transactions, tokens, blocks, assets, pools, sync status. NO node access whatsoever. - Standard (≥50 TRP, 100 req/min): db-sync read + node read-only. UTxO queries, protocol params. NO transaction submission. - Elevated (≥500 TRP, 1000 req/min): Everything above + POST /v1/tx/submit (transaction broadcasting). - Master key: Unrestricted access. Node endpoints (/v1/address/{addr}/utxos, /v1/protocol-params, /v1/tx/submit) return HTTP 403 for insufficient tier. Security hardening applied 2026-03-21: - Fix #1: Atomic nonce GETDEL to prevent race conditions - Fix #2: X-Forwarded-For only trusted from known proxies - Fix #3: TRP refresh every 10 min + 48h key expiry for TRP-gated keys - Fix #4: SHA-256 hashed key storage in Redis - Fix #5: Generic error messages (no internal detail leakage) - Fix #6: Auth refresh is self-service only (key refreshes itself) - Fix #7: CBOR validation before tx submit - Fix #8: Input validation regex for addresses, tx hashes, policy IDs - Fix #9: Correct tx hash calculation (blake2b of tx body, not full tx) - Fix #10: Enforce key expiry globally in get_api_key_info """ import os import re import json import hashlib import secrets import time import logging import asyncio import subprocess import tempfile import base64 from datetime import datetime, timezone, timedelta from typing import Optional from contextlib import asynccontextmanager from fastapi import FastAPI, Request, HTTPException, Query, Header, Depends, BackgroundTasks from fastapi.responses import JSONResponse from pydantic import BaseModel, Field import asyncpg from asyncpg.exceptions import UndefinedTableError, PostgresError import redis.asyncio as redis import cbor2 # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Environment config DB_HOST = os.getenv("DB_HOST", "postgres-dbsync") DB_PORT = int(os.getenv("DB_PORT", "5432")) DB_NAME = os.getenv("DB_NAME", "cexplorer") DB_USER = os.getenv("DB_USER", "dbsync") DB_PASS = os.getenv("DB_PASS", "") REDIS_HOST = os.getenv("REDIS_HOST", "redis-api") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) API_MASTER_KEY = os.getenv("API_MASTER_KEY", "") CARDANO_NODE_SOCKET_PATH = os.getenv("CARDANO_NODE_SOCKET_PATH", "/node-ipc/node.socket") CARDANO_NETWORK = os.getenv("CARDANO_NETWORK", "mainnet") # Known policy IDs TRP_POLICY_ID = "9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05" MAP_POLICY_ID = "24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c" # TRP Tier thresholds TRP_TIERS = { "elevated": 500, "standard": 50, "anonymous": 0 } # Rate limits (requests per minute) RATE_LIMITS = { "anonymous": 20, "standard": 100, "elevated": 1000, "master": float("inf") } # TX Submit rate limits (per minute) TX_SUBMIT_LIMITS = { "anonymous": 0, # blocked "standard": 2, "elevated": 10, "master": float("inf") } # Cache TTLs (seconds) CACHE_TTLS = { "balance": 60, "tokens": 60, "transactions": 30, "block_latest": 10, "tx_details": 300, "asset_info": 120, "pool_info": 120, "sync_status": 5, "protocol_params": 300, # 5 min cache for protocol params "utxos": 10 # Short cache for UTxOs } # TRP-gated key expiry (48 hours) TRP_KEY_EXPIRY_HOURS = 48 # Fix #2: Trusted proxies for X-Forwarded-For TRUSTED_PROXIES = {"127.0.0.1", "::1", "172.22.0.1", "172.17.0.1"} # Fix #8: Input validation regexes ADDR_RE = re.compile(r'^addr1[a-z0-9]{50,120}$') ADDR_TEST_RE = re.compile(r'^addr_test1[a-z0-9]{50,120}$') HEX64_RE = re.compile(r'^[a-fA-F0-9]{64}$') POLICY_RE = re.compile(r'^[a-fA-F0-9]{56}$') # Global connections db_pool: Optional[asyncpg.Pool] = None redis_client: Optional[redis.Redis] = None protocol_params_cache: dict = {"data": None, "expires": 0} # ============ Input Validation (Fix #8) ============ def validate_address(address: str) -> bool: """Validate Cardano address format.""" return bool(ADDR_RE.match(address) or ADDR_TEST_RE.match(address)) def validate_tx_hash(tx_hash: str) -> bool: """Validate transaction hash format (64 hex chars).""" clean = tx_hash.lower().replace("0x", "") return bool(HEX64_RE.match(clean)) def validate_policy_id(policy_id: str) -> bool: """Validate policy ID format (56 hex chars).""" clean = policy_id.lower().replace("0x", "") return bool(POLICY_RE.match(clean)) # ============ Helper Functions ============ def hash_api_key(key: str) -> str: """Hash an API key for storage/lookup. Fix #4.""" return hashlib.sha256(key.encode()).hexdigest() def run_cardano_cli(args: list[str], timeout: int = 30) -> tuple[bool, str, str]: """Run cardano-cli command and return (success, stdout, stderr).""" cmd = ["cardano-cli"] + args env = os.environ.copy() env["CARDANO_NODE_SOCKET_PATH"] = CARDANO_NODE_SOCKET_PATH try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, env=env ) return result.returncode == 0, result.stdout, result.stderr except subprocess.TimeoutExpired: return False, "", "Command timed out" except FileNotFoundError: return False, "", "cardano-cli not found" except Exception as e: logger.error(f"cardano-cli error: {e}") return False, "", "Node command failed" def decode_hex_or_base64(data: str) -> bytes: """Decode hex or base64 encoded data.""" # Try hex first try: if data.startswith("0x"): return bytes.fromhex(data[2:]) return bytes.fromhex(data) except ValueError: pass # Try base64 try: return base64.b64decode(data) except Exception: pass raise ValueError("Invalid hex or base64 encoding") def get_tier_from_trp_balance(balance: int) -> str: """Determine tier based on TRP balance.""" if balance >= TRP_TIERS["elevated"]: return "elevated" elif balance >= TRP_TIERS["standard"]: return "standard" return "anonymous" # ============ CIP-8 Signature Verification ============ def verify_cip8_signature(address: str, nonce: str, signature_hex: str, key_hex: str) -> bool: """ Verify a CIP-8 message signature. CIP-8 spec: https://cips.cardano.org/cip/CIP-8 The signature is over a COSE_Sign1 structure containing the message. We use pycardano for verification when available, with fallback to manual verification. """ try: from pycardano import Address, VerificationKey from pycardano.hash import blake2b_224 # Decode the key (should be a 32-byte Ed25519 public key) key_bytes = bytes.fromhex(key_hex) vkey = VerificationKey.from_primitive(key_bytes) # Verify the public key hashes to the address addr = Address.from_primitive(address) # Get the payment credential hash from the address if addr.payment_part is None: logger.warning("Address has no payment part") return False expected_keyhash = addr.payment_part.payload # Hash the verification key to get keyhash actual_keyhash = blake2b_224(key_bytes) if expected_keyhash != actual_keyhash: logger.warning("Key hash mismatch") return False # Decode the COSE signature signature_bytes = bytes.fromhex(signature_hex) # CIP-8 uses COSE_Sign1 structure # The signature is a CBOR array: [protected_headers, unprotected_headers, payload, signature] cose_sign1 = cbor2.loads(signature_bytes) if not isinstance(cose_sign1, (list, tuple)) or len(cose_sign1) != 4: # Try unwrapping if it's tagged if hasattr(cose_sign1, 'value'): cose_sign1 = cose_sign1.value if not isinstance(cose_sign1, (list, tuple)) or len(cose_sign1) != 4: logger.warning("Invalid COSE_Sign1 structure") return False protected, unprotected, payload, sig = cose_sign1 # The payload should contain our nonce if payload: payload_decoded = payload if isinstance(payload, bytes) else bytes(payload) # Payload might be hex-encoded nonce or raw bytes try: if payload_decoded.hex() != nonce and payload_decoded.decode('utf-8') != nonce: logger.warning("Payload doesn't match nonce") return False except: if payload_decoded.hex() != nonce: logger.warning("Payload doesn't match nonce (hex check)") return False # Build the Sig_structure for verification # Sig_structure = ["Signature1", protected, external_aad, payload] external_aad = b"" sig_structure = cbor2.dumps(["Signature1", protected, external_aad, payload or b""]) # Verify the Ed25519 signature from nacl.signing import VerifyKey from nacl.exceptions import BadSignature verify_key = VerifyKey(key_bytes) try: verify_key.verify(sig_structure, sig) return True except BadSignature: logger.warning("Bad signature") return False except ImportError as e: logger.error(f"Missing dependency for CIP-8 verification: {e}") return False except Exception as e: logger.error(f"CIP-8 verification error: {e}") return False # ============ Lifespan ============ async def refresh_trp_tiers_task(): """Background task to refresh TRP tiers for all gated keys every 10 minutes. Fix #3.""" while True: try: await asyncio.sleep(600) # Run every 10 minutes (was 3600) await refresh_all_trp_tiers() except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in TRP tier refresh task: {e}") async def refresh_all_trp_tiers(): """Check all TRP-gated keys and update tiers if balance changed.""" if not redis_client or not db_pool: return cursor = 0 updated = 0 while True: cursor, keys = await redis_client.scan(cursor, match="apikey:*", count=100) for key in keys: data = await redis_client.hgetall(key) owner = data.get("owner") if owner and owner.startswith("addr"): # This is a TRP-gated key, check balance try: trp_balance = await get_trp_balance(owner) new_tier = get_tier_from_trp_balance(trp_balance) old_tier = data.get("tier", "anonymous") if new_tier != old_tier: await redis_client.hset(key, mapping={ "tier": new_tier, "trp_balance": str(trp_balance) }) logger.info(f"Updated tier for {owner}: {old_tier} -> {new_tier}") updated += 1 except Exception as e: logger.error(f"Error checking TRP balance for {owner}: {e}") if cursor == 0: break if updated > 0: logger.info(f"TRP tier refresh complete: {updated} keys updated") async def get_trp_balance(address: str) -> int: """Get TRP token balance for an address from db-sync.""" async with db_pool.acquire() as conn: result = await conn.fetchrow(""" SELECT COALESCE(SUM(mto.quantity), 0) as quantity FROM ma_tx_out mto JOIN multi_asset ma ON ma.id = mto.ident JOIN tx_out txo ON txo.id = mto.tx_out_id LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index WHERE txo.address = $1 AND ma.policy = decode($2, 'hex') AND txi.id IS NULL """, address, TRP_POLICY_ID) return int(result["quantity"]) if result else 0 @asynccontextmanager async def lifespan(app: FastAPI): """Manage database and redis connections.""" global db_pool, redis_client # Connect to PostgreSQL logger.info(f"Connecting to PostgreSQL at {DB_HOST}:{DB_PORT}/{DB_NAME}") db_pool = await asyncpg.create_pool( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASS, min_size=2, max_size=10 ) # Connect to Redis logger.info(f"Connecting to Redis at {REDIS_HOST}:{REDIS_PORT}") redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) # Check node socket if os.path.exists(CARDANO_NODE_SOCKET_PATH): logger.info(f"Cardano node socket found at {CARDANO_NODE_SOCKET_PATH}") else: logger.warning(f"Cardano node socket NOT found at {CARDANO_NODE_SOCKET_PATH} - node endpoints will fail") # Start background task for TRP tier refresh tier_refresh_task = asyncio.create_task(refresh_trp_tiers_task()) logger.info("Cardano API started successfully") yield # Cleanup tier_refresh_task.cancel() try: await tier_refresh_task except asyncio.CancelledError: pass if db_pool: await db_pool.close() if redis_client: await redis_client.close() app = FastAPI( title="Cardano Chain Data API", description="REST API for querying Cardano blockchain data via db-sync and cardano-node", version="2.1.0", # Bumped for security fixes lifespan=lifespan ) # ============ Exception Handlers ============ @app.exception_handler(UndefinedTableError) async def handle_undefined_table(request: Request, exc: UndefinedTableError): """Handle missing tables (db-sync still initializing).""" logger.warning(f"Database not ready: {exc}") return JSONResponse( status_code=503, content={"error": "db_sync_not_ready", "message": "Database sync in progress. Tables not yet created."} ) @app.exception_handler(PostgresError) async def handle_postgres_error(request: Request, exc: PostgresError): """Handle general postgres errors. Fix #5: Don't leak internal details.""" logger.error(f"Database error: {exc}") return JSONResponse( status_code=503, content={"error": "database_error", "message": "Internal database error"} ) # ============ Models ============ class APIKeyCreate(BaseModel): label: str tier: str = "standard" # standard or elevated owner: Optional[str] = None # Cardano address for future TRP verification trp_balance: Optional[int] = 0 # For future TRP-gating class APIKeyResponse(BaseModel): key: str label: str tier: str owner: Optional[str] created_at: str class AuthChallengeRequest(BaseModel): address: str = Field(..., description="Cardano address (bech32)") class AuthChallengeResponse(BaseModel): nonce: str expires_at: str class AuthVerifyRequest(BaseModel): address: str = Field(..., description="Cardano address (bech32)") nonce: str = Field(..., description="Nonce from challenge") signature: str = Field(..., description="CIP-8 signature (hex)") key: str = Field(..., description="Public key used for signing (hex)") class AuthVerifyResponse(BaseModel): api_key: str tier: str trp_balance: int class TxSubmitRequest(BaseModel): tx: str = Field(..., description="Signed transaction in hex or base64 CBOR encoding") class TxSubmitResponse(BaseModel): tx_hash: str message: str # ============ Helpers ============ async def get_api_key_info(key: str) -> Optional[dict]: """ Get API key info from Redis using hashed key lookup. Fix #4 and Fix #10. Also enforces key expiry for TRP-gated keys. """ if not redis_client: return None # Hash the key for lookup (Fix #4) key_hash = hash_api_key(key) data = await redis_client.hgetall(f"apikey:{key_hash}") if not data: return None # Fix #10: Check expiry for TRP-gated keys expires_at = data.get("expires_at") if expires_at: try: expiry_time = datetime.fromisoformat(expires_at.replace('Z', '+00:00')) if datetime.now(timezone.utc) > expiry_time: # Key expired, delete it and return None logger.info(f"API key expired for owner {data.get('owner', 'unknown')}") await redis_client.delete(f"apikey:{key_hash}") return None except Exception as e: logger.error(f"Error parsing expires_at: {e}") return data async def check_rate_limit(identifier: str, tier: str, limit_type: str = "general") -> tuple[bool, int]: """Check rate limit. Returns (allowed, retry_after_seconds).""" if tier == "master": return True, 0 limits = TX_SUBMIT_LIMITS if limit_type == "tx_submit" else RATE_LIMITS limit = limits.get(tier, limits.get("anonymous", 0)) if limit == 0: return False, 0 # Blocked entirely key = f"ratelimit:{limit_type}:{identifier}:{int(time.time() // 60)}" current = await redis_client.incr(key) if current == 1: await redis_client.expire(key, 60) if current > limit: return False, 60 - (int(time.time()) % 60) return True, 0 async def get_cached(cache_key: str) -> Optional[dict]: """Get cached response.""" data = await redis_client.get(f"cache:{cache_key}") if data: return json.loads(data) return None async def set_cached(cache_key: str, data: dict, ttl: int): """Cache a response.""" await redis_client.setex(f"cache:{cache_key}", ttl, json.dumps(data)) def get_client_ip(request: Request) -> str: """ Extract client IP from request. Fix #2: Only trust X-Forwarded-For from known proxies. """ client_host = request.client.host if request.client else "unknown" # Only trust X-Forwarded-For if connecting IP is a trusted proxy if client_host in TRUSTED_PROXIES: forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() return client_host # ============ Auth Dependency ============ async def get_auth_context( request: Request, x_api_key: Optional[str] = Header(None), api_key: Optional[str] = Query(None) ) -> dict: """Extract authentication context from request.""" key = x_api_key or api_key client_ip = get_client_ip(request) if key: if key == API_MASTER_KEY: return {"tier": "master", "identifier": "master", "label": "master"} key_info = await get_api_key_info(key) if key_info: return { "tier": key_info.get("tier", "standard"), "identifier": hash_api_key(key), # Use hash as identifier "label": key_info.get("label", "unknown"), "owner": key_info.get("owner"), "raw_key": key # Keep raw key for refresh endpoint } return {"tier": "anonymous", "identifier": client_ip, "label": None} async def require_master_key(auth: dict = Depends(get_auth_context)): """Require master key for admin endpoints.""" if auth["tier"] != "master": raise HTTPException(status_code=403, detail="Master key required") return auth async def require_standard_tier(auth: dict = Depends(get_auth_context)): """ Require standard tier or higher for node read endpoints. Anonymous users get db-sync only - no direct node access. """ if auth["tier"] == "anonymous": raise HTTPException( status_code=403, detail={ "error": "tier_required", "message": "Node access requires standard tier or higher (50+ TRP)", "required_tier": "standard", "current_tier": auth["tier"] } ) return auth async def require_elevated_tier(auth: dict = Depends(get_auth_context)): """ Require elevated tier for transaction submission. Standard tier gets read-only node access, elevated gets write. """ if auth["tier"] not in ("elevated", "master"): raise HTTPException( status_code=403, detail={ "error": "tier_required", "message": "Transaction submission requires elevated tier (500+ TRP)", "required_tier": "elevated", "current_tier": auth["tier"] } ) return auth # ============ Middleware ============ @app.middleware("http") async def rate_limit_middleware(request: Request, call_next): """Rate limiting and request logging middleware.""" start_time = time.time() # Skip rate limiting for health check if request.url.path == "/health": return await call_next(request) # Get auth context x_api_key = request.headers.get("X-API-Key") api_key = request.query_params.get("api_key") key = x_api_key or api_key client_ip = get_client_ip(request) tier = "anonymous" identifier = client_ip label = None if key: if key == API_MASTER_KEY: tier = "master" identifier = "master" label = "master" else: key_info = await get_api_key_info(key) if key_info: tier = key_info.get("tier", "standard") identifier = hash_api_key(key) # Use hash as identifier label = key_info.get("label", "unknown") # Check rate limit (general) allowed, retry_after = await check_rate_limit(identifier, tier) if not allowed: logger.warning(f"Rate limit exceeded: {identifier[:16]}... ({tier})") return JSONResponse( status_code=429, content={"error": "rate_limit_exceeded", "retry_after": retry_after} ) # Process request response = await call_next(request) # Log request (don't log full key hash for security) elapsed = int((time.time() - start_time) * 1000) log_id = label or client_ip logger.info(f"{request.method} {request.url.path} | {log_id} | {elapsed}ms | {response.status_code}") return response # ============ Health & Sync ============ @app.get("/health") async def health_check(): """Basic health check.""" return {"status": "ok"} @app.get("/v1/sync/status") async def get_sync_status(auth: dict = Depends(get_auth_context)): """Get db-sync synchronization status.""" cache_key = "sync_status" cached = await get_cached(cache_key) if cached: return cached async with db_pool.acquire() as conn: # Get latest block in db-sync db_tip = await conn.fetchrow(""" SELECT block_no, epoch_no, slot_no, time FROM block ORDER BY id DESC LIMIT 1 """) # Get meta info for sync percentage meta = await conn.fetchrow("SELECT * FROM meta LIMIT 1") if not db_tip: return {"error": "db_sync_not_ready", "message": "No blocks synced yet"} # Check node tip for comparison node_tip = None success, stdout, stderr = run_cardano_cli(["query", "tip", "--mainnet"]) if success: try: node_tip = json.loads(stdout) except: pass result = { "db_sync_tip": db_tip["block_no"], "epoch_no": db_tip["epoch_no"], "slot_no": db_tip["slot_no"], "last_block_time": db_tip["time"].isoformat() if db_tip["time"] else None, "is_syncing": True, # Assume syncing unless we have better info "node_tip": node_tip.get("block") if node_tip else None, "node_connected": node_tip is not None } # If we have both tips, calculate sync percentage if node_tip and db_tip["block_no"]: node_block = node_tip.get("block", 0) if node_block > 0: sync_pct = (db_tip["block_no"] / node_block) * 100 result["sync_percentage"] = round(sync_pct, 2) result["is_syncing"] = sync_pct < 99.9 await set_cached(cache_key, result, CACHE_TTLS["sync_status"]) return result # ============ Node Integration Endpoints ============ @app.get("/v1/address/{address}/utxos") async def get_address_utxos(address: str, auth: dict = Depends(require_standard_tier)): """ Query UTxOs for an address directly from the node. Faster than db-sync for current unspent outputs. Requires: standard tier (50+ TRP) or higher. Anonymous users should use /v1/address/{address}/balance (db-sync) instead. """ # Fix #8: Validate address format if not validate_address(address): raise HTTPException( status_code=400, detail={"error": "invalid_address", "message": "Invalid Cardano address format"} ) cache_key = f"utxos_{address}" cached = await get_cached(cache_key) if cached: return cached # Query node directly using cardano-cli success, stdout, stderr = run_cardano_cli([ "query", "utxo", "--address", address, "--mainnet", "--output-json" ]) if not success: if "Network.Socket.connect" in stderr or "does not exist" in stderr: raise HTTPException( status_code=503, detail={"error": "node_unavailable", "message": "Cardano node not available"} ) # Fix #5: Don't leak stderr details logger.error(f"cardano-cli utxo query failed: {stderr}") raise HTTPException( status_code=500, detail={"error": "node_error", "message": "Node command failed"} ) try: utxo_data = json.loads(stdout) except json.JSONDecodeError: raise HTTPException( status_code=500, detail={"error": "parse_error", "message": "Failed to parse node response"} ) # Transform to our API format utxos = [] total_lovelace = 0 for utxo_id, utxo_info in utxo_data.items(): tx_hash, tx_index = utxo_id.split("#") lovelace = utxo_info.get("value", {}).get("lovelace", 0) total_lovelace += lovelace tokens = [] for policy_id, assets in utxo_info.get("value", {}).items(): if policy_id == "lovelace": continue if isinstance(assets, dict): for asset_name, quantity in assets.items(): tokens.append({ "policy_id": policy_id, "asset_name": asset_name, "quantity": quantity }) utxos.append({ "tx_hash": tx_hash, "tx_index": int(tx_index), "lovelace": lovelace, "tokens": tokens }) result = { "address": address, "utxo_count": len(utxos), "total_lovelace": total_lovelace, "total_ada": total_lovelace / 1_000_000, "utxos": utxos } await set_cached(cache_key, result, CACHE_TTLS["utxos"]) return result @app.post("/v1/tx/submit") async def submit_transaction( request: TxSubmitRequest, auth: dict = Depends(require_elevated_tier) ): """ Submit a signed transaction to the network. Accepts hex or base64 encoded CBOR transaction. Requires: elevated tier (500+ TRP) or master key. Standard tier gets read-only node access. """ # Check tx submission rate limit (elevated tier still has limits) allowed, retry_after = await check_rate_limit(auth["identifier"], auth["tier"], "tx_submit") if not allowed: raise HTTPException( status_code=429, detail={"error": "rate_limit_exceeded", "retry_after": retry_after} ) # Decode the transaction try: tx_bytes = decode_hex_or_base64(request.tx) except ValueError as e: raise HTTPException( status_code=400, detail={"error": "invalid_encoding", "message": str(e)} ) # Fix #7: Validate CBOR before proceeding try: tx_cbor = cbor2.loads(tx_bytes) except Exception: raise HTTPException( status_code=400, detail={"error": "invalid_tx", "message": "Transaction is not valid CBOR"} ) # Fix #9: Calculate correct tx hash from tx body (index 0), not full tx try: if isinstance(tx_cbor, (list, tuple)) and len(tx_cbor) > 0: tx_body_cbor = cbor2.dumps(tx_cbor[0]) tx_hash = hashlib.blake2b(tx_body_cbor, digest_size=32).hexdigest() else: # Fallback if structure is unexpected tx_hash = hashlib.blake2b(tx_bytes, digest_size=32).hexdigest() except Exception as e: logger.error(f"Error calculating tx hash: {e}") tx_hash = hashlib.blake2b(tx_bytes, digest_size=32).hexdigest() # Write to temp file for cardano-cli with tempfile.NamedTemporaryFile(suffix=".signed", delete=False) as f: f.write(tx_bytes) tx_file = f.name try: # Submit via cardano-cli success, stdout, stderr = run_cardano_cli([ "transaction", "submit", "--tx-file", tx_file, "--mainnet" ], timeout=60) if not success: if "Network.Socket.connect" in stderr or "does not exist" in stderr: raise HTTPException( status_code=503, detail={"error": "node_unavailable", "message": "Cardano node not available"} ) # Parse common errors (these are safe to expose) if "OutsideValidityIntervalUTxO" in stderr: raise HTTPException( status_code=400, detail={"error": "validity_interval", "message": "Transaction validity interval expired"} ) if "BadInputsUTxO" in stderr: raise HTTPException( status_code=400, detail={"error": "bad_inputs", "message": "One or more inputs already spent"} ) if "ValueNotConservedUTxO" in stderr: raise HTTPException( status_code=400, detail={"error": "value_not_conserved", "message": "Input/output value mismatch"} ) # Fix #5: Don't leak full stderr for other errors logger.error(f"tx submit failed: {stderr}") raise HTTPException( status_code=400, detail={"error": "submit_failed", "message": "Node command failed"} ) logger.info(f"Transaction submitted: {tx_hash[:16]}... by {auth['identifier'][:16]}...") return TxSubmitResponse( tx_hash=tx_hash, message="Transaction successfully submitted" ) finally: # Clean up temp file try: os.unlink(tx_file) except: pass @app.get("/v1/protocol-params") async def get_protocol_params(auth: dict = Depends(require_standard_tier)): """ Get current epoch protocol parameters from the node. Cached for 5 minutes. Requires: standard tier (50+ TRP) or higher. """ global protocol_params_cache # Check cache if protocol_params_cache["data"] and protocol_params_cache["expires"] > time.time(): return protocol_params_cache["data"] # Query from node success, stdout, stderr = run_cardano_cli([ "query", "protocol-parameters", "--mainnet" ]) if not success: if "Network.Socket.connect" in stderr or "does not exist" in stderr: raise HTTPException( status_code=503, detail={"error": "node_unavailable", "message": "Cardano node not available"} ) # Fix #5: Don't leak stderr logger.error(f"protocol-params query failed: {stderr}") raise HTTPException( status_code=500, detail={"error": "node_error", "message": "Node command failed"} ) try: params = json.loads(stdout) except json.JSONDecodeError: raise HTTPException( status_code=500, detail={"error": "parse_error", "message": "Failed to parse protocol parameters"} ) # Cache for 5 minutes protocol_params_cache = { "data": params, "expires": time.time() + CACHE_TTLS["protocol_params"] } return params # ============ Auth Endpoints (TRP-Gated) ============ @app.post("/v1/auth/challenge") async def create_auth_challenge(request: AuthChallengeRequest): """ Create a challenge nonce for wallet signature verification. The nonce must be signed with CIP-8 and submitted to /v1/auth/verify. """ address = request.address # Fix #8: Validate address format if not validate_address(address): raise HTTPException( status_code=400, detail={"error": "invalid_address", "message": "Invalid Cardano address format"} ) # Generate nonce nonce = secrets.token_hex(32) expires_at = datetime.now(timezone.utc) + timedelta(minutes=5) # Store in Redis with TTL await redis_client.setex( f"auth_challenge:{address}:{nonce}", 300, # 5 minutes TTL json.dumps({"address": address, "created_at": datetime.now(timezone.utc).isoformat()}) ) return AuthChallengeResponse( nonce=nonce, expires_at=expires_at.isoformat() ) @app.post("/v1/auth/verify") async def verify_auth(request: AuthVerifyRequest): """ Verify CIP-8 signature and issue API key based on TRP balance. Tiers: - 0 TRP: anonymous (no key issued) - 50+ TRP: standard (100 req/min) - 500+ TRP: elevated (1000 req/min) TRP-gated keys expire after 48 hours and must be re-authenticated. """ address = request.address nonce = request.nonce # Fix #8: Validate address if not validate_address(address): raise HTTPException( status_code=400, detail={"error": "invalid_address", "message": "Invalid Cardano address format"} ) # Fix #1: Atomic nonce check-and-delete using pipeline challenge_key = f"auth_challenge:{address}:{nonce}" async with redis_client.pipeline(transaction=True) as pipe: await pipe.get(challenge_key) await pipe.delete(challenge_key) challenge_data, deleted = await pipe.execute() if not challenge_data or not deleted: raise HTTPException( status_code=400, detail={"error": "invalid_nonce", "message": "Nonce expired or already used"} ) # Verify CIP-8 signature if not verify_cip8_signature(address, nonce, request.signature, request.key): raise HTTPException( status_code=401, detail={"error": "invalid_signature", "message": "Signature verification failed"} ) # Get TRP balance trp_balance = await get_trp_balance(address) tier = get_tier_from_trp_balance(trp_balance) # Don't issue key for anonymous tier if tier == "anonymous": raise HTTPException( status_code=403, detail={ "error": "insufficient_trp", "message": f"Minimum {TRP_TIERS['standard']} TRP required for API key", "trp_balance": trp_balance, "required": TRP_TIERS["standard"] } ) # Generate API key new_key = f"capi_{secrets.token_hex(24)}" # Fix #3: Set expiry 48 hours from now for TRP-gated keys expires_at = datetime.now(timezone.utc) + timedelta(hours=TRP_KEY_EXPIRY_HOURS) # Fix #4: Store using hashed key, not raw key key_hash = hash_api_key(new_key) await redis_client.hset(f"apikey:{key_hash}", mapping={ "label": f"TRP-gated:{address[:20]}...", "tier": tier, "owner": address, "trp_balance": str(trp_balance), "created_at": datetime.now(timezone.utc).isoformat(), "expires_at": expires_at.isoformat() # Fix #3: Add expiry }) logger.info(f"Issued {tier} API key for {address[:20]}... (TRP: {trp_balance}, expires: {expires_at.isoformat()})") # Return the raw key to the user (only time it's exposed) return AuthVerifyResponse( api_key=new_key, tier=tier, trp_balance=trp_balance ) @app.post("/v1/auth/refresh") async def refresh_auth( x_api_key: str = Header(..., description="API key to refresh"), auth: dict = Depends(get_auth_context) ): """ Re-check TRP balance and upgrade/downgrade tier for an existing key. Fix #6: This endpoint is self-service only. The API key in the header is both the authentication AND the key being refreshed. You cannot use key A to refresh key B. """ if auth["tier"] == "anonymous": raise HTTPException( status_code=401, detail={"error": "unauthorized", "message": "Valid API key required"} ) if auth["tier"] == "master": raise HTTPException( status_code=400, detail={"error": "invalid_request", "message": "Cannot refresh master key"} ) owner = auth.get("owner") if not owner: raise HTTPException( status_code=400, detail={"error": "not_trp_gated", "message": "This key is not TRP-gated"} ) # Get current TRP balance trp_balance = await get_trp_balance(owner) new_tier = get_tier_from_trp_balance(trp_balance) old_tier = auth["tier"] # Fix #3: Reset expiry on refresh expires_at = datetime.now(timezone.utc) + timedelta(hours=TRP_KEY_EXPIRY_HOURS) # Use hashed key for storage (Fix #4) key_hash = hash_api_key(x_api_key) # Update key info await redis_client.hset(f"apikey:{key_hash}", mapping={ "tier": new_tier, "trp_balance": str(trp_balance), "expires_at": expires_at.isoformat() # Reset expiry }) tier_changed = new_tier != old_tier message = f"Tier {'upgraded' if new_tier > old_tier else 'downgraded'} from {old_tier} to {new_tier}" if tier_changed else "Tier unchanged" return { "api_key": x_api_key, "tier": new_tier, "previous_tier": old_tier, "trp_balance": trp_balance, "tier_changed": tier_changed, "expires_at": expires_at.isoformat(), "message": message } # ============ Block Endpoints ============ @app.get("/v1/block/latest") async def get_latest_block(auth: dict = Depends(get_auth_context)): """Get the latest block.""" cache_key = "block_latest" cached = await get_cached(cache_key) if cached: return cached async with db_pool.acquire() as conn: block = await conn.fetchrow(""" SELECT b.block_no, b.epoch_no, b.slot_no, encode(b.hash, 'hex') as hash, b.tx_count, b.time FROM block b ORDER BY b.id DESC LIMIT 1 """) if not block: raise HTTPException(status_code=503, detail={"error": "db_sync_not_ready", "message": "No blocks synced yet"}) result = { "block_no": block["block_no"], "epoch_no": block["epoch_no"], "slot_no": block["slot_no"], "hash": block["hash"], "tx_count": block["tx_count"], "time": block["time"].isoformat() if block["time"] else None } await set_cached(cache_key, result, CACHE_TTLS["block_latest"]) return result @app.get("/v1/block/{block_no}") async def get_block(block_no: int, auth: dict = Depends(get_auth_context)): """Get a specific block by number.""" cache_key = f"block_{block_no}" cached = await get_cached(cache_key) if cached: return cached async with db_pool.acquire() as conn: block = await conn.fetchrow(""" SELECT b.block_no, b.epoch_no, b.slot_no, encode(b.hash, 'hex') as hash, b.tx_count, b.time FROM block b WHERE b.block_no = $1 """, block_no) if not block: raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Block {block_no} not found"}) result = { "block_no": block["block_no"], "epoch_no": block["epoch_no"], "slot_no": block["slot_no"], "hash": block["hash"], "tx_count": block["tx_count"], "time": block["time"].isoformat() if block["time"] else None } await set_cached(cache_key, result, CACHE_TTLS["tx_details"]) # Blocks are immutable return result # ============ Address Endpoints ============ @app.get("/v1/address/{address}/balance") async def get_address_balance(address: str, auth: dict = Depends(get_auth_context)): """Get address balance including native tokens.""" # Fix #8: Validate address if not validate_address(address): raise HTTPException( status_code=400, detail={"error": "invalid_address", "message": "Invalid Cardano address format"} ) cache_key = f"balance_{address}" cached = await get_cached(cache_key) if cached: return cached async with db_pool.acquire() as conn: # Get ADA balance from UTxOs balance = await conn.fetchrow(""" SELECT COALESCE(SUM(value), 0) as lovelace FROM tx_out txo JOIN tx ON tx.id = txo.tx_id LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index WHERE txo.address = $1 AND txi.id IS NULL """, address) # Get native tokens tokens = await conn.fetch(""" SELECT encode(ma.policy, 'hex') as policy_id, encode(ma.name, 'hex') as asset_name_hex, convert_from(ma.name, 'UTF8') as asset_name, ma.fingerprint, COALESCE(SUM(mto.quantity), 0) as quantity FROM ma_tx_out mto JOIN multi_asset ma ON ma.id = mto.ident JOIN tx_out txo ON txo.id = mto.tx_out_id LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index WHERE txo.address = $1 AND txi.id IS NULL GROUP BY ma.id, ma.policy, ma.name, ma.fingerprint HAVING SUM(mto.quantity) > 0 """, address) lovelace = int(balance["lovelace"]) if balance else 0 result = { "address": address, "lovelace": lovelace, "ada": lovelace / 1_000_000, "tokens": [ { "policy_id": t["policy_id"], "asset_name": t["asset_name"] or t["asset_name_hex"], "asset_name_hex": t["asset_name_hex"], "fingerprint": t["fingerprint"], "quantity": int(t["quantity"]) } for t in tokens ] } await set_cached(cache_key, result, CACHE_TTLS["balance"]) return result @app.get("/v1/address/{address}/tokens") async def get_address_tokens(address: str, auth: dict = Depends(get_auth_context)): """Get native tokens held by an address.""" # Fix #8: Validate address if not validate_address(address): raise HTTPException( status_code=400, detail={"error": "invalid_address", "message": "Invalid Cardano address format"} ) cache_key = f"tokens_{address}" cached = await get_cached(cache_key) if cached: return cached async with db_pool.acquire() as conn: tokens = await conn.fetch(""" SELECT encode(ma.policy, 'hex') as policy_id, encode(ma.name, 'hex') as asset_name_hex, convert_from(ma.name, 'UTF8') as asset_name, ma.fingerprint, COALESCE(SUM(mto.quantity), 0) as quantity FROM ma_tx_out mto JOIN multi_asset ma ON ma.id = mto.ident JOIN tx_out txo ON txo.id = mto.tx_out_id LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index WHERE txo.address = $1 AND txi.id IS NULL GROUP BY ma.id, ma.policy, ma.name, ma.fingerprint HAVING SUM(mto.quantity) > 0 ORDER BY quantity DESC """, address) result = { "address": address, "tokens": [ { "policy_id": t["policy_id"], "asset_name": t["asset_name"] or t["asset_name_hex"], "asset_name_hex": t["asset_name_hex"], "fingerprint": t["fingerprint"], "quantity": int(t["quantity"]) } for t in tokens ] } await set_cached(cache_key, result, CACHE_TTLS["tokens"]) return result @app.get("/v1/address/{address}/transactions") async def get_address_transactions( address: str, page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), order: str = Query("desc", regex="^(asc|desc)$"), auth: dict = Depends(get_auth_context) ): """Get transactions for an address.""" # Fix #8: Validate address if not validate_address(address): raise HTTPException( status_code=400, detail={"error": "invalid_address", "message": "Invalid Cardano address format"} ) cache_key = f"txs_{address}_{page}_{limit}_{order}" cached = await get_cached(cache_key) if cached: return cached offset = (page - 1) * limit order_dir = "DESC" if order == "desc" else "ASC" async with db_pool.acquire() as conn: # Count total transactions count_result = await conn.fetchrow(""" SELECT COUNT(DISTINCT tx.id) as total FROM tx JOIN tx_out ON tx_out.tx_id = tx.id WHERE tx_out.address = $1 UNION SELECT COUNT(DISTINCT tx.id) as total FROM tx JOIN tx_in ON tx_in.tx_in_id = tx.id JOIN tx_out ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index WHERE tx_out.address = $1 """, address) # Get transactions txs = await conn.fetch(f""" WITH addr_txs AS ( SELECT DISTINCT tx.id, tx.hash, tx.fee, b.block_no, b.time FROM tx JOIN block b ON b.id = tx.block_id JOIN tx_out ON tx_out.tx_id = tx.id WHERE tx_out.address = $1 UNION SELECT DISTINCT tx.id, tx.hash, tx.fee, b.block_no, b.time FROM tx JOIN block b ON b.id = tx.block_id JOIN tx_in ON tx_in.tx_in_id = tx.id JOIN tx_out ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index WHERE tx_out.address = $1 ) SELECT * FROM addr_txs ORDER BY time {order_dir} LIMIT $2 OFFSET $3 """, address, limit, offset) result = { "address": address, "page": page, "limit": limit, "transactions": [ { "tx_hash": tx["hash"].hex() if isinstance(tx["hash"], bytes) else tx["hash"], "block_no": tx["block_no"], "block_time": tx["time"].isoformat() if tx["time"] else None, "fee": int(tx["fee"]) if tx["fee"] else 0 } for tx in txs ] } await set_cached(cache_key, result, CACHE_TTLS["transactions"]) return result # ============ Transaction Endpoints ============ @app.get("/v1/tx/{tx_hash}") async def get_transaction(tx_hash: str, auth: dict = Depends(get_auth_context)): """Get transaction details by hash.""" # Fix #8: Validate tx hash if not validate_tx_hash(tx_hash): raise HTTPException( status_code=400, detail={"error": "invalid_tx_hash", "message": "Invalid transaction hash format (expected 64 hex chars)"} ) cache_key = f"tx_{tx_hash}" cached = await get_cached(cache_key) if cached: return cached # Clean up hash (remove 0x prefix if present) clean_hash = tx_hash.lower().replace("0x", "") async with db_pool.acquire() as conn: tx = await conn.fetchrow(""" SELECT tx.id, encode(tx.hash, 'hex') as hash, tx.fee, b.block_no, b.time, b.epoch_no FROM tx JOIN block b ON b.id = tx.block_id WHERE tx.hash = decode($1, 'hex') """, clean_hash) if not tx: raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Transaction {tx_hash} not found"}) # Get inputs inputs = await conn.fetch(""" SELECT txo.address, txo.value, encode(tx_src.hash, 'hex') as source_tx_hash, txo.index as source_tx_index FROM tx_in txi JOIN tx_out txo ON txo.tx_id = txi.tx_out_id AND txo.index = txi.tx_out_index JOIN tx tx_src ON tx_src.id = txi.tx_out_id WHERE txi.tx_in_id = $1 """, tx["id"]) # Get outputs outputs = await conn.fetch(""" SELECT address, value, index FROM tx_out WHERE tx_id = $1 ORDER BY index """, tx["id"]) # Get metadata metadata = await conn.fetch(""" SELECT key, json FROM tx_metadata WHERE tx_id = $1 """, tx["id"]) result = { "tx_hash": tx["hash"], "block_no": tx["block_no"], "block_time": tx["time"].isoformat() if tx["time"] else None, "epoch_no": tx["epoch_no"], "fee": int(tx["fee"]) if tx["fee"] else 0, "inputs": [ { "address": i["address"], "value": int(i["value"]), "source_tx_hash": i["source_tx_hash"], "source_tx_index": i["source_tx_index"] } for i in inputs ], "outputs": [ { "address": o["address"], "value": int(o["value"]), "index": o["index"] } for o in outputs ], "metadata": {str(m["key"]): m["json"] for m in metadata} if metadata else None } await set_cached(cache_key, result, CACHE_TTLS["tx_details"]) return result # ============ Asset Endpoints ============ @app.get("/v1/asset/{policy_id}/info") async def get_asset_info(policy_id: str, auth: dict = Depends(get_auth_context)): """Get info about all assets under a policy ID.""" # Fix #8: Validate policy ID if not validate_policy_id(policy_id): raise HTTPException( status_code=400, detail={"error": "invalid_policy_id", "message": "Invalid policy ID format (expected 56 hex chars)"} ) cache_key = f"asset_info_{policy_id}" cached = await get_cached(cache_key) if cached: return cached clean_policy = policy_id.lower().replace("0x", "") async with db_pool.acquire() as conn: assets = await conn.fetch(""" SELECT encode(ma.name, 'hex') as asset_name_hex, convert_from(ma.name, 'UTF8') as asset_name, ma.fingerprint, (SELECT COALESCE(SUM(quantity), 0) FROM ma_tx_mint WHERE ident = ma.id) as total_minted, (SELECT COUNT(*) FROM ma_tx_mint WHERE ident = ma.id AND quantity > 0) as mint_count FROM multi_asset ma WHERE ma.policy = decode($1, 'hex') """, clean_policy) if not assets: raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Policy {policy_id} not found"}) result = { "policy_id": policy_id, "assets": [ { "asset_name": a["asset_name"] or a["asset_name_hex"], "asset_name_hex": a["asset_name_hex"], "fingerprint": a["fingerprint"], "total_supply": int(a["total_minted"]) if a["total_minted"] else 0, "mint_count": a["mint_count"] } for a in assets ] } await set_cached(cache_key, result, CACHE_TTLS["asset_info"]) return result @app.get("/v1/asset/{policy_id}/{asset_name}/holders") async def get_asset_holders( policy_id: str, asset_name: str, limit: int = Query(20, ge=1, le=100), auth: dict = Depends(get_auth_context) ): """Get top holders of a specific asset.""" # Fix #8: Validate policy ID if not validate_policy_id(policy_id): raise HTTPException( status_code=400, detail={"error": "invalid_policy_id", "message": "Invalid policy ID format (expected 56 hex chars)"} ) cache_key = f"holders_{policy_id}_{asset_name}_{limit}" cached = await get_cached(cache_key) if cached: return cached clean_policy = policy_id.lower().replace("0x", "") async with db_pool.acquire() as conn: # Get asset ID asset = await conn.fetchrow(""" SELECT id FROM multi_asset WHERE policy = decode($1, 'hex') AND name = decode($2, 'hex') """, clean_policy, asset_name) if not asset: raise HTTPException(status_code=404, detail={"error": "not_found", "message": "Asset not found"}) # Get holders holders = await conn.fetch(""" SELECT txo.address, SUM(mto.quantity) as quantity FROM ma_tx_out mto JOIN tx_out txo ON txo.id = mto.tx_out_id LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index WHERE mto.ident = $1 AND txi.id IS NULL GROUP BY txo.address HAVING SUM(mto.quantity) > 0 ORDER BY quantity DESC LIMIT $2 """, asset["id"], limit) # Count total holders holder_count = await conn.fetchrow(""" SELECT COUNT(DISTINCT txo.address) as count FROM ma_tx_out mto JOIN tx_out txo ON txo.id = mto.tx_out_id LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index WHERE mto.ident = $1 AND txi.id IS NULL GROUP BY txo.address HAVING SUM(mto.quantity) > 0 """, asset["id"]) result = { "policy_id": policy_id, "asset_name": asset_name, "holder_count": holder_count["count"] if holder_count else 0, "holders": [ {"address": h["address"], "quantity": int(h["quantity"])} for h in holders ] } await set_cached(cache_key, result, CACHE_TTLS["asset_info"]) return result # ============ Pool Endpoints ============ @app.get("/v1/pool/{pool_id}/info") async def get_pool_info(pool_id: str, auth: dict = Depends(get_auth_context)): """Get stake pool information.""" cache_key = f"pool_{pool_id}" cached = await get_cached(cache_key) if cached: return cached async with db_pool.acquire() as conn: # Try to find pool by bech32 id or hash pool = await conn.fetchrow(""" SELECT ph.view as pool_id, encode(ph.hash_raw, 'hex') as pool_hash, pod.ticker_name as ticker, pod.json->>'name' as name, pod.json->>'description' as description, pu.pledge, pu.margin, (SELECT COALESCE(SUM(amount), 0) FROM epoch_stake WHERE pool_id = ph.id AND epoch_no = (SELECT MAX(epoch_no) FROM epoch_stake WHERE pool_id = ph.id)) as active_stake, (SELECT COUNT(*) FROM block WHERE slot_leader_id IN (SELECT id FROM slot_leader WHERE pool_hash_id = ph.id)) as block_count FROM pool_hash ph LEFT JOIN pool_offline_data pod ON pod.pool_id = ph.id LEFT JOIN pool_update pu ON pu.hash_id = ph.id AND pu.registered_tx_id = (SELECT MAX(registered_tx_id) FROM pool_update WHERE hash_id = ph.id) WHERE ph.view = $1 OR encode(ph.hash_raw, 'hex') = $1 """, pool_id.lower()) if not pool: raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Pool {pool_id} not found"}) result = { "pool_id": pool["pool_id"], "pool_hash": pool["pool_hash"], "ticker": pool["ticker"], "name": pool["name"], "description": pool["description"], "pledge": int(pool["pledge"]) if pool["pledge"] else 0, "margin": float(pool["margin"]) if pool["margin"] else 0, "active_stake": int(pool["active_stake"]) if pool["active_stake"] else 0, "block_count": pool["block_count"] or 0 } await set_cached(cache_key, result, CACHE_TTLS["pool_info"]) return result # ============ Admin Endpoints ============ @app.post("/admin/keys") async def create_api_key(key_data: APIKeyCreate, auth: dict = Depends(require_master_key)): """Create a new API key (admin-created keys don't expire unless explicitly set).""" new_key = f"capi_{secrets.token_hex(24)}" # Fix #4: Store using hashed key key_hash = hash_api_key(new_key) # Admin-created keys don't expire by default (Fix #10) await redis_client.hset(f"apikey:{key_hash}", mapping={ "label": key_data.label, "tier": key_data.tier, "owner": key_data.owner or "", "trp_balance": str(key_data.trp_balance or 0), "created_at": datetime.now(timezone.utc).isoformat() # No expires_at for admin-created keys }) return APIKeyResponse( key=new_key, # Return raw key only once label=key_data.label, tier=key_data.tier, owner=key_data.owner, created_at=datetime.now(timezone.utc).isoformat() ) @app.delete("/admin/keys/{key}") async def revoke_api_key(key: str, auth: dict = Depends(require_master_key)): """Revoke an API key.""" # Fix #4: Hash the key for lookup key_hash = hash_api_key(key) deleted = await redis_client.delete(f"apikey:{key_hash}") if not deleted: raise HTTPException(status_code=404, detail={"error": "not_found", "message": "API key not found"}) return {"status": "revoked", "key": key[:16] + "..."} @app.get("/admin/keys") async def list_api_keys(auth: dict = Depends(require_master_key)): """List all API keys (shows metadata, not raw keys since we only store hashes).""" keys = [] cursor = 0 while True: cursor, found_keys = await redis_client.scan(cursor, match="apikey:*", count=100) for key in found_keys: data = await redis_client.hgetall(key) if data: # Key is stored as hash, so we show the hash (last 8 chars) for identification key_hash = key.replace("apikey:", "") keys.append({ "key_hash_suffix": f"...{key_hash[-8:]}", "label": data.get("label"), "tier": data.get("tier"), "owner": data.get("owner") or None, "trp_balance": int(data.get("trp_balance", 0)), "created_at": data.get("created_at"), "expires_at": data.get("expires_at") }) if cursor == 0: break return {"keys": keys, "count": len(keys)} @app.post("/admin/refresh-tiers") async def admin_refresh_tiers(auth: dict = Depends(require_master_key)): """Manually trigger TRP tier refresh for all keys.""" await refresh_all_trp_tiers() return {"status": "ok", "message": "TRP tier refresh completed"} @app.get("/admin/stats") async def get_admin_stats(auth: dict = Depends(require_master_key)): """Get API usage statistics.""" # Count rate limit keys (active users in current minute) current_minute = int(time.time() // 60) active_users = 0 cursor = 0 while True: cursor, keys = await redis_client.scan(cursor, match=f"ratelimit:general:*:{current_minute}", count=100) active_users += len(keys) if cursor == 0: break # Count API keys by tier api_key_counts = {"standard": 0, "elevated": 0, "total": 0} cursor = 0 while True: cursor, keys = await redis_client.scan(cursor, match="apikey:*", count=100) for key in keys: tier = await redis_client.hget(key, "tier") api_key_counts["total"] += 1 if tier in api_key_counts: api_key_counts[tier] += 1 if cursor == 0: break # Get db-sync status async with db_pool.acquire() as conn: latest_block = await conn.fetchrow("SELECT block_no, time FROM block ORDER BY id DESC LIMIT 1") tx_count = await conn.fetchrow("SELECT COUNT(*) as count FROM tx") # Check node status node_connected = os.path.exists(CARDANO_NODE_SOCKET_PATH) return { "active_users_this_minute": active_users, "api_keys": api_key_counts, "db_sync_block": latest_block["block_no"] if latest_block else 0, "db_sync_time": latest_block["time"].isoformat() if latest_block and latest_block["time"] else None, "total_transactions_synced": tx_count["count"] if tx_count else 0, "node_socket_exists": node_connected } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8765)