commit 104e11f0986d0472d0c8d48f07a4afa1a3177a83 Author: Kayos Date: Wed Mar 18 11:43:46 2026 -0700 Initial commit: Cardano chain data REST API diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f00fe44 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +.env +*.egg-info/ +.pytest_cache/ +.mypy_cache/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e54b151 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.12-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY . . +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8765", "--workers", "2"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..1c9b728 --- /dev/null +++ b/README.md @@ -0,0 +1,142 @@ +# Cardano Chain Data API + +REST API for querying Cardano blockchain data via db-sync PostgreSQL database. + +## Stack + +- **FastAPI** — async Python API framework +- **asyncpg** — async PostgreSQL driver +- **Redis** — rate limiting + response caching + +## Deployment + +```bash +cd /opt/cardano/dbsync +sudo docker compose up -d --build +``` + +API runs on `127.0.0.1:8765` (localhost only, VPN access via future rebind to `192.168.254.105:8765`). + +## Authentication + +### Rate Limits +- Anonymous (no key): 20 req/min per IP +- Standard API key: 100 req/min +- Elevated API key: 1000 req/min +- Master key: unlimited + +### Using API Keys +```bash +# Header (preferred) +curl -H "X-API-Key: capi_xxx" http://127.0.0.1:8765/v1/block/latest + +# Query param +curl http://127.0.0.1:8765/v1/block/latest?api_key=capi_xxx +``` + +## Endpoints + +### Sync Status +``` +GET /v1/sync/status +``` + +### Blocks +``` +GET /v1/block/latest +GET /v1/block/{block_no} +``` + +### Addresses +``` +GET /v1/address/{address}/balance +GET /v1/address/{address}/tokens +GET /v1/address/{address}/transactions?page=1&limit=20&order=desc +``` + +### Transactions +``` +GET /v1/tx/{tx_hash} +``` + +### Assets +``` +GET /v1/asset/{policy_id}/info +GET /v1/asset/{policy_id}/{asset_name}/holders?limit=20 +``` + +### Pools +``` +GET /v1/pool/{pool_id}/info +``` + +### Admin (master key required) +``` +POST /admin/keys — create API key +DELETE /admin/keys/{key} — revoke key +GET /admin/keys — list all keys +GET /admin/stats — usage stats +``` + +## Known Policy IDs + +- **TRP**: `9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05` +- **MAP**: `24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c` + +## Future: TRP Token Gating + +Design for decentralized, permissionless API access based on TRP token holdings: + +### Tier Mapping +- 0 TRP → anonymous rate limits (20 req/min) +- 50+ TRP → standard tier (100 req/min) +- 500+ TRP → elevated tier (1000 req/min) + +### Implementation Plan +1. `POST /admin/keys/verify-trp` endpoint +2. Takes Cardano address, queries `/v1/address/{addr}/tokens` +3. Checks TRP policy balance +4. Auto-creates or upgrades API key based on holdings +5. Stores `owner` (address) and `trp_balance` in key hash + +### Data Model (already in place) +API keys stored in Redis as: +``` +apikey: → { + tier: "standard"|"elevated", + label: "...", + owner: "addr1...", # Cardano address + trp_balance: 500, # Last verified TRP balance + created_at: "..." +} +``` + +### Benefits +- **Permissionless**: Anyone can verify holdings and get access +- **Decentralized**: No manual approval needed +- **Incentivized**: Holding TRP = better API access +- **Revocable**: Re-verify periodically to maintain tier + +## Caching + +Response TTLs: +- Balance/tokens: 60s +- Transactions: 30s +- Latest block: 10s +- TX details: 300s (immutable) +- Asset info: 120s +- Pool info: 120s +- Sync status: 5s + +## Environment Variables + +``` +DB_HOST=postgres-dbsync +DB_PORT=5432 +DB_NAME=cexplorer +DB_USER=dbsync +DB_PASS=... +REDIS_HOST=redis-api +REDIS_PORT=6379 +API_MASTER_KEY=capi_... +``` diff --git a/main.py b/main.py new file mode 100644 index 0000000..92e000d --- /dev/null +++ b/main.py @@ -0,0 +1,878 @@ +""" +Cardano Chain Data REST API +Sits in front of cardano-db-sync PostgreSQL database. + +Known policy IDs: +- TRP: 9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05 +- MAP: 24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c + +Future TRP gating design: +- POST /admin/keys/verify-trp — check address TRP balance, auto-upgrade tier +- 0 TRP → anonymous rate limits (20 req/min) +- 50+ TRP → standard tier (100 req/min) +- 500+ TRP → elevated tier (1000 req/min) +- Verification: query /v1/address/{addr}/tokens, check TRP policy balance +- Makes API access decentralized and permissionless — hold tokens, get access +""" + +import os +import json +import hashlib +import secrets +import time +import logging +from datetime import datetime, timezone +from typing import Optional +from contextlib import asynccontextmanager + +from fastapi import FastAPI, Request, HTTPException, Query, Header, Depends +from fastapi.responses import JSONResponse +from pydantic import BaseModel +import asyncpg +import redis.asyncio as redis + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Environment config +DB_HOST = os.getenv("DB_HOST", "postgres-dbsync") +DB_PORT = int(os.getenv("DB_PORT", "5432")) +DB_NAME = os.getenv("DB_NAME", "cexplorer") +DB_USER = os.getenv("DB_USER", "dbsync") +DB_PASS = os.getenv("DB_PASS", "") +REDIS_HOST = os.getenv("REDIS_HOST", "redis-api") +REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) +API_MASTER_KEY = os.getenv("API_MASTER_KEY", "") + +# Rate limits (requests per minute) +RATE_LIMITS = { + "anonymous": 20, + "standard": 100, + "elevated": 1000, + "master": float("inf") +} + +# Cache TTLs (seconds) +CACHE_TTLS = { + "balance": 60, + "tokens": 60, + "transactions": 30, + "block_latest": 10, + "tx_details": 300, + "asset_info": 120, + "pool_info": 120, + "sync_status": 5 +} + +# Global connections +db_pool: Optional[asyncpg.Pool] = None +redis_client: Optional[redis.Redis] = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage database and redis connections.""" + global db_pool, redis_client + + # Connect to PostgreSQL + logger.info(f"Connecting to PostgreSQL at {DB_HOST}:{DB_PORT}/{DB_NAME}") + db_pool = await asyncpg.create_pool( + host=DB_HOST, + port=DB_PORT, + database=DB_NAME, + user=DB_USER, + password=DB_PASS, + min_size=2, + max_size=10 + ) + + # Connect to Redis + logger.info(f"Connecting to Redis at {REDIS_HOST}:{REDIS_PORT}") + redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) + + logger.info("Cardano API started successfully") + yield + + # Cleanup + if db_pool: + await db_pool.close() + if redis_client: + await redis_client.close() + + +app = FastAPI( + title="Cardano Chain Data API", + description="REST API for querying Cardano blockchain data via db-sync", + version="1.0.0", + lifespan=lifespan +) + + +# ============ Models ============ + +class APIKeyCreate(BaseModel): + label: str + tier: str = "standard" # standard or elevated + owner: Optional[str] = None # Cardano address for future TRP verification + trp_balance: Optional[int] = 0 # For future TRP-gating + + +class APIKeyResponse(BaseModel): + key: str + label: str + tier: str + owner: Optional[str] + created_at: str + + +# ============ Helpers ============ + +async def get_api_key_info(key: str) -> Optional[dict]: + """Get API key info from Redis.""" + if not redis_client: + return None + data = await redis_client.hgetall(f"apikey:{key}") + return data if data else None + + +async def check_rate_limit(identifier: str, tier: str) -> tuple[bool, int]: + """Check rate limit. Returns (allowed, retry_after_seconds).""" + if tier == "master": + return True, 0 + + limit = RATE_LIMITS.get(tier, RATE_LIMITS["anonymous"]) + key = f"ratelimit:{identifier}:{int(time.time() // 60)}" + + current = await redis_client.incr(key) + if current == 1: + await redis_client.expire(key, 60) + + if current > limit: + return False, 60 - (int(time.time()) % 60) + return True, 0 + + +async def get_cached(cache_key: str) -> Optional[dict]: + """Get cached response.""" + data = await redis_client.get(f"cache:{cache_key}") + if data: + return json.loads(data) + return None + + +async def set_cached(cache_key: str, data: dict, ttl: int): + """Cache a response.""" + await redis_client.setex(f"cache:{cache_key}", ttl, json.dumps(data)) + + +def get_client_ip(request: Request) -> str: + """Extract client IP from request.""" + forwarded = request.headers.get("X-Forwarded-For") + if forwarded: + return forwarded.split(",")[0].strip() + return request.client.host if request.client else "unknown" + + +# ============ Auth Dependency ============ + +async def get_auth_context( + request: Request, + x_api_key: Optional[str] = Header(None), + api_key: Optional[str] = Query(None) +) -> dict: + """Extract authentication context from request.""" + key = x_api_key or api_key + client_ip = get_client_ip(request) + + if key: + if key == API_MASTER_KEY: + return {"tier": "master", "identifier": "master", "label": "master"} + + key_info = await get_api_key_info(key) + if key_info: + return { + "tier": key_info.get("tier", "standard"), + "identifier": key, + "label": key_info.get("label", "unknown") + } + + return {"tier": "anonymous", "identifier": client_ip, "label": None} + + +async def require_master_key(auth: dict = Depends(get_auth_context)): + """Require master key for admin endpoints.""" + if auth["tier"] != "master": + raise HTTPException(status_code=403, detail="Master key required") + return auth + + +# ============ Middleware ============ + +@app.middleware("http") +async def rate_limit_middleware(request: Request, call_next): + """Rate limiting and request logging middleware.""" + start_time = time.time() + + # Skip rate limiting for health check + if request.url.path == "/health": + return await call_next(request) + + # Get auth context + x_api_key = request.headers.get("X-API-Key") + api_key = request.query_params.get("api_key") + key = x_api_key or api_key + client_ip = get_client_ip(request) + + tier = "anonymous" + identifier = client_ip + label = None + + if key: + if key == API_MASTER_KEY: + tier = "master" + identifier = "master" + label = "master" + else: + key_info = await get_api_key_info(key) + if key_info: + tier = key_info.get("tier", "standard") + identifier = key + label = key_info.get("label", "unknown") + + # Check rate limit + allowed, retry_after = await check_rate_limit(identifier, tier) + if not allowed: + logger.warning(f"Rate limit exceeded: {identifier} ({tier})") + return JSONResponse( + status_code=429, + content={"error": "rate_limit_exceeded", "retry_after": retry_after} + ) + + # Process request + response = await call_next(request) + + # Log request + elapsed = int((time.time() - start_time) * 1000) + logger.info(f"{request.method} {request.url.path} | {label or client_ip} | {elapsed}ms | {response.status_code}") + + return response + + +# ============ Health & Sync ============ + +@app.get("/health") +async def health_check(): + """Basic health check.""" + return {"status": "ok"} + + +@app.get("/v1/sync/status") +async def get_sync_status(auth: dict = Depends(get_auth_context)): + """Get db-sync synchronization status.""" + cache_key = "sync_status" + cached = await get_cached(cache_key) + if cached: + return cached + + async with db_pool.acquire() as conn: + # Get latest block in db-sync + db_tip = await conn.fetchrow(""" + SELECT block_no, epoch_no, slot_no, time + FROM block ORDER BY id DESC LIMIT 1 + """) + + # Get meta info for sync percentage + meta = await conn.fetchrow("SELECT * FROM meta LIMIT 1") + + if not db_tip: + return {"error": "db_sync_not_ready", "message": "No blocks synced yet"} + + result = { + "db_sync_tip": db_tip["block_no"], + "epoch_no": db_tip["epoch_no"], + "slot_no": db_tip["slot_no"], + "last_block_time": db_tip["time"].isoformat() if db_tip["time"] else None, + "is_syncing": True # Assume syncing unless we have better info + } + + await set_cached(cache_key, result, CACHE_TTLS["sync_status"]) + return result + + +# ============ Block Endpoints ============ + +@app.get("/v1/block/latest") +async def get_latest_block(auth: dict = Depends(get_auth_context)): + """Get the latest block.""" + cache_key = "block_latest" + cached = await get_cached(cache_key) + if cached: + return cached + + async with db_pool.acquire() as conn: + block = await conn.fetchrow(""" + SELECT b.block_no, b.epoch_no, b.slot_no, + encode(b.hash, 'hex') as hash, + b.tx_count, b.time + FROM block b + ORDER BY b.id DESC LIMIT 1 + """) + + if not block: + raise HTTPException(status_code=503, detail={"error": "db_sync_not_ready", "message": "No blocks synced yet"}) + + result = { + "block_no": block["block_no"], + "epoch_no": block["epoch_no"], + "slot_no": block["slot_no"], + "hash": block["hash"], + "tx_count": block["tx_count"], + "time": block["time"].isoformat() if block["time"] else None + } + + await set_cached(cache_key, result, CACHE_TTLS["block_latest"]) + return result + + +@app.get("/v1/block/{block_no}") +async def get_block(block_no: int, auth: dict = Depends(get_auth_context)): + """Get a specific block by number.""" + cache_key = f"block_{block_no}" + cached = await get_cached(cache_key) + if cached: + return cached + + async with db_pool.acquire() as conn: + block = await conn.fetchrow(""" + SELECT b.block_no, b.epoch_no, b.slot_no, + encode(b.hash, 'hex') as hash, + b.tx_count, b.time + FROM block b + WHERE b.block_no = $1 + """, block_no) + + if not block: + raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Block {block_no} not found"}) + + result = { + "block_no": block["block_no"], + "epoch_no": block["epoch_no"], + "slot_no": block["slot_no"], + "hash": block["hash"], + "tx_count": block["tx_count"], + "time": block["time"].isoformat() if block["time"] else None + } + + await set_cached(cache_key, result, CACHE_TTLS["tx_details"]) # Blocks are immutable + return result + + +# ============ Address Endpoints ============ + +@app.get("/v1/address/{address}/balance") +async def get_address_balance(address: str, auth: dict = Depends(get_auth_context)): + """Get address balance including native tokens.""" + cache_key = f"balance_{address}" + cached = await get_cached(cache_key) + if cached: + return cached + + async with db_pool.acquire() as conn: + # Get ADA balance from UTxOs + balance = await conn.fetchrow(""" + SELECT COALESCE(SUM(value), 0) as lovelace + FROM tx_out txo + JOIN tx ON tx.id = txo.tx_id + LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index + WHERE txo.address = $1 AND txi.id IS NULL + """, address) + + # Get native tokens + tokens = await conn.fetch(""" + SELECT + encode(ma.policy, 'hex') as policy_id, + encode(ma.name, 'hex') as asset_name_hex, + convert_from(ma.name, 'UTF8') as asset_name, + ma.fingerprint, + COALESCE(SUM(mto.quantity), 0) as quantity + FROM ma_tx_out mto + JOIN multi_asset ma ON ma.id = mto.ident + JOIN tx_out txo ON txo.id = mto.tx_out_id + LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index + WHERE txo.address = $1 AND txi.id IS NULL + GROUP BY ma.id, ma.policy, ma.name, ma.fingerprint + HAVING SUM(mto.quantity) > 0 + """, address) + + lovelace = int(balance["lovelace"]) if balance else 0 + + result = { + "address": address, + "lovelace": lovelace, + "ada": lovelace / 1_000_000, + "tokens": [ + { + "policy_id": t["policy_id"], + "asset_name": t["asset_name"] or t["asset_name_hex"], + "asset_name_hex": t["asset_name_hex"], + "fingerprint": t["fingerprint"], + "quantity": int(t["quantity"]) + } + for t in tokens + ] + } + + await set_cached(cache_key, result, CACHE_TTLS["balance"]) + return result + + +@app.get("/v1/address/{address}/tokens") +async def get_address_tokens(address: str, auth: dict = Depends(get_auth_context)): + """Get native tokens held by an address.""" + cache_key = f"tokens_{address}" + cached = await get_cached(cache_key) + if cached: + return cached + + async with db_pool.acquire() as conn: + tokens = await conn.fetch(""" + SELECT + encode(ma.policy, 'hex') as policy_id, + encode(ma.name, 'hex') as asset_name_hex, + convert_from(ma.name, 'UTF8') as asset_name, + ma.fingerprint, + COALESCE(SUM(mto.quantity), 0) as quantity + FROM ma_tx_out mto + JOIN multi_asset ma ON ma.id = mto.ident + JOIN tx_out txo ON txo.id = mto.tx_out_id + LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index + WHERE txo.address = $1 AND txi.id IS NULL + GROUP BY ma.id, ma.policy, ma.name, ma.fingerprint + HAVING SUM(mto.quantity) > 0 + ORDER BY quantity DESC + """, address) + + result = { + "address": address, + "tokens": [ + { + "policy_id": t["policy_id"], + "asset_name": t["asset_name"] or t["asset_name_hex"], + "asset_name_hex": t["asset_name_hex"], + "fingerprint": t["fingerprint"], + "quantity": int(t["quantity"]) + } + for t in tokens + ] + } + + await set_cached(cache_key, result, CACHE_TTLS["tokens"]) + return result + + +@app.get("/v1/address/{address}/transactions") +async def get_address_transactions( + address: str, + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + order: str = Query("desc", regex="^(asc|desc)$"), + auth: dict = Depends(get_auth_context) +): + """Get transactions for an address.""" + cache_key = f"txs_{address}_{page}_{limit}_{order}" + cached = await get_cached(cache_key) + if cached: + return cached + + offset = (page - 1) * limit + order_dir = "DESC" if order == "desc" else "ASC" + + async with db_pool.acquire() as conn: + # Count total transactions + count_result = await conn.fetchrow(""" + SELECT COUNT(DISTINCT tx.id) as total + FROM tx + JOIN tx_out ON tx_out.tx_id = tx.id + WHERE tx_out.address = $1 + UNION + SELECT COUNT(DISTINCT tx.id) as total + FROM tx + JOIN tx_in ON tx_in.tx_in_id = tx.id + JOIN tx_out ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index + WHERE tx_out.address = $1 + """, address) + + # Get transactions + txs = await conn.fetch(f""" + WITH addr_txs AS ( + SELECT DISTINCT tx.id, tx.hash, tx.fee, b.block_no, b.time + FROM tx + JOIN block b ON b.id = tx.block_id + JOIN tx_out ON tx_out.tx_id = tx.id + WHERE tx_out.address = $1 + UNION + SELECT DISTINCT tx.id, tx.hash, tx.fee, b.block_no, b.time + FROM tx + JOIN block b ON b.id = tx.block_id + JOIN tx_in ON tx_in.tx_in_id = tx.id + JOIN tx_out ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index + WHERE tx_out.address = $1 + ) + SELECT * FROM addr_txs + ORDER BY time {order_dir} + LIMIT $2 OFFSET $3 + """, address, limit, offset) + + result = { + "address": address, + "page": page, + "limit": limit, + "transactions": [ + { + "tx_hash": tx["hash"].hex() if isinstance(tx["hash"], bytes) else tx["hash"], + "block_no": tx["block_no"], + "block_time": tx["time"].isoformat() if tx["time"] else None, + "fee": int(tx["fee"]) if tx["fee"] else 0 + } + for tx in txs + ] + } + + await set_cached(cache_key, result, CACHE_TTLS["transactions"]) + return result + + +# ============ Transaction Endpoints ============ + +@app.get("/v1/tx/{tx_hash}") +async def get_transaction(tx_hash: str, auth: dict = Depends(get_auth_context)): + """Get transaction details by hash.""" + cache_key = f"tx_{tx_hash}" + cached = await get_cached(cache_key) + if cached: + return cached + + # Clean up hash (remove 0x prefix if present) + clean_hash = tx_hash.lower().replace("0x", "") + + async with db_pool.acquire() as conn: + tx = await conn.fetchrow(""" + SELECT tx.id, encode(tx.hash, 'hex') as hash, tx.fee, + b.block_no, b.time, b.epoch_no + FROM tx + JOIN block b ON b.id = tx.block_id + WHERE tx.hash = decode($1, 'hex') + """, clean_hash) + + if not tx: + raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Transaction {tx_hash} not found"}) + + # Get inputs + inputs = await conn.fetch(""" + SELECT txo.address, txo.value, + encode(tx_src.hash, 'hex') as source_tx_hash, + txo.index as source_tx_index + FROM tx_in txi + JOIN tx_out txo ON txo.tx_id = txi.tx_out_id AND txo.index = txi.tx_out_index + JOIN tx tx_src ON tx_src.id = txi.tx_out_id + WHERE txi.tx_in_id = $1 + """, tx["id"]) + + # Get outputs + outputs = await conn.fetch(""" + SELECT address, value, index + FROM tx_out + WHERE tx_id = $1 + ORDER BY index + """, tx["id"]) + + # Get metadata + metadata = await conn.fetch(""" + SELECT key, json + FROM tx_metadata + WHERE tx_id = $1 + """, tx["id"]) + + result = { + "tx_hash": tx["hash"], + "block_no": tx["block_no"], + "block_time": tx["time"].isoformat() if tx["time"] else None, + "epoch_no": tx["epoch_no"], + "fee": int(tx["fee"]) if tx["fee"] else 0, + "inputs": [ + { + "address": i["address"], + "value": int(i["value"]), + "source_tx_hash": i["source_tx_hash"], + "source_tx_index": i["source_tx_index"] + } + for i in inputs + ], + "outputs": [ + { + "address": o["address"], + "value": int(o["value"]), + "index": o["index"] + } + for o in outputs + ], + "metadata": {str(m["key"]): m["json"] for m in metadata} if metadata else None + } + + await set_cached(cache_key, result, CACHE_TTLS["tx_details"]) + return result + + +# ============ Asset Endpoints ============ + +@app.get("/v1/asset/{policy_id}/info") +async def get_asset_info(policy_id: str, auth: dict = Depends(get_auth_context)): + """Get info about all assets under a policy ID.""" + cache_key = f"asset_info_{policy_id}" + cached = await get_cached(cache_key) + if cached: + return cached + + clean_policy = policy_id.lower().replace("0x", "") + + async with db_pool.acquire() as conn: + assets = await conn.fetch(""" + SELECT + encode(ma.name, 'hex') as asset_name_hex, + convert_from(ma.name, 'UTF8') as asset_name, + ma.fingerprint, + (SELECT COALESCE(SUM(quantity), 0) FROM ma_tx_mint WHERE ident = ma.id) as total_minted, + (SELECT COUNT(*) FROM ma_tx_mint WHERE ident = ma.id AND quantity > 0) as mint_count + FROM multi_asset ma + WHERE ma.policy = decode($1, 'hex') + """, clean_policy) + + if not assets: + raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Policy {policy_id} not found"}) + + result = { + "policy_id": policy_id, + "assets": [ + { + "asset_name": a["asset_name"] or a["asset_name_hex"], + "asset_name_hex": a["asset_name_hex"], + "fingerprint": a["fingerprint"], + "total_supply": int(a["total_minted"]) if a["total_minted"] else 0, + "mint_count": a["mint_count"] + } + for a in assets + ] + } + + await set_cached(cache_key, result, CACHE_TTLS["asset_info"]) + return result + + +@app.get("/v1/asset/{policy_id}/{asset_name}/holders") +async def get_asset_holders( + policy_id: str, + asset_name: str, + limit: int = Query(20, ge=1, le=100), + auth: dict = Depends(get_auth_context) +): + """Get top holders of a specific asset.""" + cache_key = f"holders_{policy_id}_{asset_name}_{limit}" + cached = await get_cached(cache_key) + if cached: + return cached + + clean_policy = policy_id.lower().replace("0x", "") + + async with db_pool.acquire() as conn: + # Get asset ID + asset = await conn.fetchrow(""" + SELECT id FROM multi_asset + WHERE policy = decode($1, 'hex') AND name = decode($2, 'hex') + """, clean_policy, asset_name) + + if not asset: + raise HTTPException(status_code=404, detail={"error": "not_found", "message": "Asset not found"}) + + # Get holders + holders = await conn.fetch(""" + SELECT txo.address, SUM(mto.quantity) as quantity + FROM ma_tx_out mto + JOIN tx_out txo ON txo.id = mto.tx_out_id + LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index + WHERE mto.ident = $1 AND txi.id IS NULL + GROUP BY txo.address + HAVING SUM(mto.quantity) > 0 + ORDER BY quantity DESC + LIMIT $2 + """, asset["id"], limit) + + # Count total holders + holder_count = await conn.fetchrow(""" + SELECT COUNT(DISTINCT txo.address) as count + FROM ma_tx_out mto + JOIN tx_out txo ON txo.id = mto.tx_out_id + LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index + WHERE mto.ident = $1 AND txi.id IS NULL + GROUP BY txo.address + HAVING SUM(mto.quantity) > 0 + """, asset["id"]) + + result = { + "policy_id": policy_id, + "asset_name": asset_name, + "holder_count": holder_count["count"] if holder_count else 0, + "holders": [ + {"address": h["address"], "quantity": int(h["quantity"])} + for h in holders + ] + } + + await set_cached(cache_key, result, CACHE_TTLS["asset_info"]) + return result + + +# ============ Pool Endpoints ============ + +@app.get("/v1/pool/{pool_id}/info") +async def get_pool_info(pool_id: str, auth: dict = Depends(get_auth_context)): + """Get stake pool information.""" + cache_key = f"pool_{pool_id}" + cached = await get_cached(cache_key) + if cached: + return cached + + async with db_pool.acquire() as conn: + # Try to find pool by bech32 id or hash + pool = await conn.fetchrow(""" + SELECT + ph.view as pool_id, + encode(ph.hash_raw, 'hex') as pool_hash, + pod.ticker_name as ticker, + pod.json->>'name' as name, + pod.json->>'description' as description, + pu.pledge, + pu.margin, + (SELECT COALESCE(SUM(amount), 0) FROM epoch_stake + WHERE pool_id = ph.id AND epoch_no = (SELECT MAX(epoch_no) FROM epoch_stake WHERE pool_id = ph.id)) as active_stake, + (SELECT COUNT(*) FROM block WHERE slot_leader_id IN (SELECT id FROM slot_leader WHERE pool_hash_id = ph.id)) as block_count + FROM pool_hash ph + LEFT JOIN pool_offline_data pod ON pod.pool_id = ph.id + LEFT JOIN pool_update pu ON pu.hash_id = ph.id + AND pu.registered_tx_id = (SELECT MAX(registered_tx_id) FROM pool_update WHERE hash_id = ph.id) + WHERE ph.view = $1 OR encode(ph.hash_raw, 'hex') = $1 + """, pool_id.lower()) + + if not pool: + raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Pool {pool_id} not found"}) + + result = { + "pool_id": pool["pool_id"], + "pool_hash": pool["pool_hash"], + "ticker": pool["ticker"], + "name": pool["name"], + "description": pool["description"], + "pledge": int(pool["pledge"]) if pool["pledge"] else 0, + "margin": float(pool["margin"]) if pool["margin"] else 0, + "active_stake": int(pool["active_stake"]) if pool["active_stake"] else 0, + "block_count": pool["block_count"] or 0 + } + + await set_cached(cache_key, result, CACHE_TTLS["pool_info"]) + return result + + +# ============ Admin Endpoints ============ + +@app.post("/admin/keys") +async def create_api_key(key_data: APIKeyCreate, auth: dict = Depends(require_master_key)): + """Create a new API key.""" + new_key = f"capi_{secrets.token_hex(24)}" + + await redis_client.hset(f"apikey:{new_key}", mapping={ + "label": key_data.label, + "tier": key_data.tier, + "owner": key_data.owner or "", + "trp_balance": str(key_data.trp_balance or 0), + "created_at": datetime.now(timezone.utc).isoformat() + }) + + return APIKeyResponse( + key=new_key, + label=key_data.label, + tier=key_data.tier, + owner=key_data.owner, + created_at=datetime.now(timezone.utc).isoformat() + ) + + +@app.delete("/admin/keys/{key}") +async def revoke_api_key(key: str, auth: dict = Depends(require_master_key)): + """Revoke an API key.""" + deleted = await redis_client.delete(f"apikey:{key}") + if not deleted: + raise HTTPException(status_code=404, detail={"error": "not_found", "message": "API key not found"}) + return {"status": "revoked", "key": key} + + +@app.get("/admin/keys") +async def list_api_keys(auth: dict = Depends(require_master_key)): + """List all API keys.""" + keys = [] + cursor = 0 + while True: + cursor, found_keys = await redis_client.scan(cursor, match="apikey:*", count=100) + for key in found_keys: + data = await redis_client.hgetall(key) + if data: + keys.append({ + "key": key.replace("apikey:", ""), + "label": data.get("label"), + "tier": data.get("tier"), + "owner": data.get("owner") or None, + "created_at": data.get("created_at") + }) + if cursor == 0: + break + return {"keys": keys, "count": len(keys)} + + +@app.get("/admin/stats") +async def get_admin_stats(auth: dict = Depends(require_master_key)): + """Get API usage statistics.""" + # Count rate limit keys (active users in current minute) + current_minute = int(time.time() // 60) + active_users = 0 + cursor = 0 + while True: + cursor, keys = await redis_client.scan(cursor, match=f"ratelimit:*:{current_minute}", count=100) + active_users += len(keys) + if cursor == 0: + break + + # Count API keys + api_key_count = 0 + cursor = 0 + while True: + cursor, keys = await redis_client.scan(cursor, match="apikey:*", count=100) + api_key_count += len(keys) + if cursor == 0: + break + + # Get db-sync status + async with db_pool.acquire() as conn: + latest_block = await conn.fetchrow("SELECT block_no, time FROM block ORDER BY id DESC LIMIT 1") + tx_count = await conn.fetchrow("SELECT COUNT(*) as count FROM tx") + + return { + "active_users_this_minute": active_users, + "total_api_keys": api_key_count, + "db_sync_block": latest_block["block_no"] if latest_block else 0, + "db_sync_time": latest_block["time"].isoformat() if latest_block and latest_block["time"] else None, + "total_transactions_synced": tx_count["count"] if tx_count else 0 + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8765) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c298ba4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +fastapi==0.115.0 +uvicorn[standard]==0.30.0 +asyncpg==0.30.0 +redis==5.0.0 +pydantic==2.9.0 +python-dotenv==1.0.0