Initial commit: Cardano chain data REST API

This commit is contained in:
Kayos 2026-03-18 11:43:46 -07:00
commit 104e11f098
5 changed files with 1043 additions and 0 deletions

11
.gitignore vendored Normal file
View file

@ -0,0 +1,11 @@
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
.env
*.egg-info/
.pytest_cache/
.mypy_cache/

6
Dockerfile Normal file
View file

@ -0,0 +1,6 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8765", "--workers", "2"]

142
README.md Normal file
View file

@ -0,0 +1,142 @@
# Cardano Chain Data API
REST API for querying Cardano blockchain data via db-sync PostgreSQL database.
## Stack
- **FastAPI** — async Python API framework
- **asyncpg** — async PostgreSQL driver
- **Redis** — rate limiting + response caching
## Deployment
```bash
cd /opt/cardano/dbsync
sudo docker compose up -d --build
```
API runs on `127.0.0.1:8765` (localhost only, VPN access via future rebind to `192.168.254.105:8765`).
## Authentication
### Rate Limits
- Anonymous (no key): 20 req/min per IP
- Standard API key: 100 req/min
- Elevated API key: 1000 req/min
- Master key: unlimited
### Using API Keys
```bash
# Header (preferred)
curl -H "X-API-Key: capi_xxx" http://127.0.0.1:8765/v1/block/latest
# Query param
curl http://127.0.0.1:8765/v1/block/latest?api_key=capi_xxx
```
## Endpoints
### Sync Status
```
GET /v1/sync/status
```
### Blocks
```
GET /v1/block/latest
GET /v1/block/{block_no}
```
### Addresses
```
GET /v1/address/{address}/balance
GET /v1/address/{address}/tokens
GET /v1/address/{address}/transactions?page=1&limit=20&order=desc
```
### Transactions
```
GET /v1/tx/{tx_hash}
```
### Assets
```
GET /v1/asset/{policy_id}/info
GET /v1/asset/{policy_id}/{asset_name}/holders?limit=20
```
### Pools
```
GET /v1/pool/{pool_id}/info
```
### Admin (master key required)
```
POST /admin/keys — create API key
DELETE /admin/keys/{key} — revoke key
GET /admin/keys — list all keys
GET /admin/stats — usage stats
```
## Known Policy IDs
- **TRP**: `9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05`
- **MAP**: `24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c`
## Future: TRP Token Gating
Design for decentralized, permissionless API access based on TRP token holdings:
### Tier Mapping
- 0 TRP → anonymous rate limits (20 req/min)
- 50+ TRP → standard tier (100 req/min)
- 500+ TRP → elevated tier (1000 req/min)
### Implementation Plan
1. `POST /admin/keys/verify-trp` endpoint
2. Takes Cardano address, queries `/v1/address/{addr}/tokens`
3. Checks TRP policy balance
4. Auto-creates or upgrades API key based on holdings
5. Stores `owner` (address) and `trp_balance` in key hash
### Data Model (already in place)
API keys stored in Redis as:
```
apikey:<key> → {
tier: "standard"|"elevated",
label: "...",
owner: "addr1...", # Cardano address
trp_balance: 500, # Last verified TRP balance
created_at: "..."
}
```
### Benefits
- **Permissionless**: Anyone can verify holdings and get access
- **Decentralized**: No manual approval needed
- **Incentivized**: Holding TRP = better API access
- **Revocable**: Re-verify periodically to maintain tier
## Caching
Response TTLs:
- Balance/tokens: 60s
- Transactions: 30s
- Latest block: 10s
- TX details: 300s (immutable)
- Asset info: 120s
- Pool info: 120s
- Sync status: 5s
## Environment Variables
```
DB_HOST=postgres-dbsync
DB_PORT=5432
DB_NAME=cexplorer
DB_USER=dbsync
DB_PASS=...
REDIS_HOST=redis-api
REDIS_PORT=6379
API_MASTER_KEY=capi_...
```

878
main.py Normal file
View file

@ -0,0 +1,878 @@
"""
Cardano Chain Data REST API
Sits in front of cardano-db-sync PostgreSQL database.
Known policy IDs:
- TRP: 9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05
- MAP: 24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c
Future TRP gating design:
- POST /admin/keys/verify-trp check address TRP balance, auto-upgrade tier
- 0 TRP anonymous rate limits (20 req/min)
- 50+ TRP standard tier (100 req/min)
- 500+ TRP elevated tier (1000 req/min)
- Verification: query /v1/address/{addr}/tokens, check TRP policy balance
- Makes API access decentralized and permissionless hold tokens, get access
"""
import os
import json
import hashlib
import secrets
import time
import logging
from datetime import datetime, timezone
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException, Query, Header, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import asyncpg
import redis.asyncio as redis
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Environment config
DB_HOST = os.getenv("DB_HOST", "postgres-dbsync")
DB_PORT = int(os.getenv("DB_PORT", "5432"))
DB_NAME = os.getenv("DB_NAME", "cexplorer")
DB_USER = os.getenv("DB_USER", "dbsync")
DB_PASS = os.getenv("DB_PASS", "")
REDIS_HOST = os.getenv("REDIS_HOST", "redis-api")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
API_MASTER_KEY = os.getenv("API_MASTER_KEY", "")
# Rate limits (requests per minute)
RATE_LIMITS = {
"anonymous": 20,
"standard": 100,
"elevated": 1000,
"master": float("inf")
}
# Cache TTLs (seconds)
CACHE_TTLS = {
"balance": 60,
"tokens": 60,
"transactions": 30,
"block_latest": 10,
"tx_details": 300,
"asset_info": 120,
"pool_info": 120,
"sync_status": 5
}
# Global connections
db_pool: Optional[asyncpg.Pool] = None
redis_client: Optional[redis.Redis] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage database and redis connections."""
global db_pool, redis_client
# Connect to PostgreSQL
logger.info(f"Connecting to PostgreSQL at {DB_HOST}:{DB_PORT}/{DB_NAME}")
db_pool = await asyncpg.create_pool(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASS,
min_size=2,
max_size=10
)
# Connect to Redis
logger.info(f"Connecting to Redis at {REDIS_HOST}:{REDIS_PORT}")
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
logger.info("Cardano API started successfully")
yield
# Cleanup
if db_pool:
await db_pool.close()
if redis_client:
await redis_client.close()
app = FastAPI(
title="Cardano Chain Data API",
description="REST API for querying Cardano blockchain data via db-sync",
version="1.0.0",
lifespan=lifespan
)
# ============ Models ============
class APIKeyCreate(BaseModel):
label: str
tier: str = "standard" # standard or elevated
owner: Optional[str] = None # Cardano address for future TRP verification
trp_balance: Optional[int] = 0 # For future TRP-gating
class APIKeyResponse(BaseModel):
key: str
label: str
tier: str
owner: Optional[str]
created_at: str
# ============ Helpers ============
async def get_api_key_info(key: str) -> Optional[dict]:
"""Get API key info from Redis."""
if not redis_client:
return None
data = await redis_client.hgetall(f"apikey:{key}")
return data if data else None
async def check_rate_limit(identifier: str, tier: str) -> tuple[bool, int]:
"""Check rate limit. Returns (allowed, retry_after_seconds)."""
if tier == "master":
return True, 0
limit = RATE_LIMITS.get(tier, RATE_LIMITS["anonymous"])
key = f"ratelimit:{identifier}:{int(time.time() // 60)}"
current = await redis_client.incr(key)
if current == 1:
await redis_client.expire(key, 60)
if current > limit:
return False, 60 - (int(time.time()) % 60)
return True, 0
async def get_cached(cache_key: str) -> Optional[dict]:
"""Get cached response."""
data = await redis_client.get(f"cache:{cache_key}")
if data:
return json.loads(data)
return None
async def set_cached(cache_key: str, data: dict, ttl: int):
"""Cache a response."""
await redis_client.setex(f"cache:{cache_key}", ttl, json.dumps(data))
def get_client_ip(request: Request) -> str:
"""Extract client IP from request."""
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
# ============ Auth Dependency ============
async def get_auth_context(
request: Request,
x_api_key: Optional[str] = Header(None),
api_key: Optional[str] = Query(None)
) -> dict:
"""Extract authentication context from request."""
key = x_api_key or api_key
client_ip = get_client_ip(request)
if key:
if key == API_MASTER_KEY:
return {"tier": "master", "identifier": "master", "label": "master"}
key_info = await get_api_key_info(key)
if key_info:
return {
"tier": key_info.get("tier", "standard"),
"identifier": key,
"label": key_info.get("label", "unknown")
}
return {"tier": "anonymous", "identifier": client_ip, "label": None}
async def require_master_key(auth: dict = Depends(get_auth_context)):
"""Require master key for admin endpoints."""
if auth["tier"] != "master":
raise HTTPException(status_code=403, detail="Master key required")
return auth
# ============ Middleware ============
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
"""Rate limiting and request logging middleware."""
start_time = time.time()
# Skip rate limiting for health check
if request.url.path == "/health":
return await call_next(request)
# Get auth context
x_api_key = request.headers.get("X-API-Key")
api_key = request.query_params.get("api_key")
key = x_api_key or api_key
client_ip = get_client_ip(request)
tier = "anonymous"
identifier = client_ip
label = None
if key:
if key == API_MASTER_KEY:
tier = "master"
identifier = "master"
label = "master"
else:
key_info = await get_api_key_info(key)
if key_info:
tier = key_info.get("tier", "standard")
identifier = key
label = key_info.get("label", "unknown")
# Check rate limit
allowed, retry_after = await check_rate_limit(identifier, tier)
if not allowed:
logger.warning(f"Rate limit exceeded: {identifier} ({tier})")
return JSONResponse(
status_code=429,
content={"error": "rate_limit_exceeded", "retry_after": retry_after}
)
# Process request
response = await call_next(request)
# Log request
elapsed = int((time.time() - start_time) * 1000)
logger.info(f"{request.method} {request.url.path} | {label or client_ip} | {elapsed}ms | {response.status_code}")
return response
# ============ Health & Sync ============
@app.get("/health")
async def health_check():
"""Basic health check."""
return {"status": "ok"}
@app.get("/v1/sync/status")
async def get_sync_status(auth: dict = Depends(get_auth_context)):
"""Get db-sync synchronization status."""
cache_key = "sync_status"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
# Get latest block in db-sync
db_tip = await conn.fetchrow("""
SELECT block_no, epoch_no, slot_no, time
FROM block ORDER BY id DESC LIMIT 1
""")
# Get meta info for sync percentage
meta = await conn.fetchrow("SELECT * FROM meta LIMIT 1")
if not db_tip:
return {"error": "db_sync_not_ready", "message": "No blocks synced yet"}
result = {
"db_sync_tip": db_tip["block_no"],
"epoch_no": db_tip["epoch_no"],
"slot_no": db_tip["slot_no"],
"last_block_time": db_tip["time"].isoformat() if db_tip["time"] else None,
"is_syncing": True # Assume syncing unless we have better info
}
await set_cached(cache_key, result, CACHE_TTLS["sync_status"])
return result
# ============ Block Endpoints ============
@app.get("/v1/block/latest")
async def get_latest_block(auth: dict = Depends(get_auth_context)):
"""Get the latest block."""
cache_key = "block_latest"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
block = await conn.fetchrow("""
SELECT b.block_no, b.epoch_no, b.slot_no,
encode(b.hash, 'hex') as hash,
b.tx_count, b.time
FROM block b
ORDER BY b.id DESC LIMIT 1
""")
if not block:
raise HTTPException(status_code=503, detail={"error": "db_sync_not_ready", "message": "No blocks synced yet"})
result = {
"block_no": block["block_no"],
"epoch_no": block["epoch_no"],
"slot_no": block["slot_no"],
"hash": block["hash"],
"tx_count": block["tx_count"],
"time": block["time"].isoformat() if block["time"] else None
}
await set_cached(cache_key, result, CACHE_TTLS["block_latest"])
return result
@app.get("/v1/block/{block_no}")
async def get_block(block_no: int, auth: dict = Depends(get_auth_context)):
"""Get a specific block by number."""
cache_key = f"block_{block_no}"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
block = await conn.fetchrow("""
SELECT b.block_no, b.epoch_no, b.slot_no,
encode(b.hash, 'hex') as hash,
b.tx_count, b.time
FROM block b
WHERE b.block_no = $1
""", block_no)
if not block:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Block {block_no} not found"})
result = {
"block_no": block["block_no"],
"epoch_no": block["epoch_no"],
"slot_no": block["slot_no"],
"hash": block["hash"],
"tx_count": block["tx_count"],
"time": block["time"].isoformat() if block["time"] else None
}
await set_cached(cache_key, result, CACHE_TTLS["tx_details"]) # Blocks are immutable
return result
# ============ Address Endpoints ============
@app.get("/v1/address/{address}/balance")
async def get_address_balance(address: str, auth: dict = Depends(get_auth_context)):
"""Get address balance including native tokens."""
cache_key = f"balance_{address}"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
# Get ADA balance from UTxOs
balance = await conn.fetchrow("""
SELECT COALESCE(SUM(value), 0) as lovelace
FROM tx_out txo
JOIN tx ON tx.id = txo.tx_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = $1 AND txi.id IS NULL
""", address)
# Get native tokens
tokens = await conn.fetch("""
SELECT
encode(ma.policy, 'hex') as policy_id,
encode(ma.name, 'hex') as asset_name_hex,
convert_from(ma.name, 'UTF8') as asset_name,
ma.fingerprint,
COALESCE(SUM(mto.quantity), 0) as quantity
FROM ma_tx_out mto
JOIN multi_asset ma ON ma.id = mto.ident
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = $1 AND txi.id IS NULL
GROUP BY ma.id, ma.policy, ma.name, ma.fingerprint
HAVING SUM(mto.quantity) > 0
""", address)
lovelace = int(balance["lovelace"]) if balance else 0
result = {
"address": address,
"lovelace": lovelace,
"ada": lovelace / 1_000_000,
"tokens": [
{
"policy_id": t["policy_id"],
"asset_name": t["asset_name"] or t["asset_name_hex"],
"asset_name_hex": t["asset_name_hex"],
"fingerprint": t["fingerprint"],
"quantity": int(t["quantity"])
}
for t in tokens
]
}
await set_cached(cache_key, result, CACHE_TTLS["balance"])
return result
@app.get("/v1/address/{address}/tokens")
async def get_address_tokens(address: str, auth: dict = Depends(get_auth_context)):
"""Get native tokens held by an address."""
cache_key = f"tokens_{address}"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
tokens = await conn.fetch("""
SELECT
encode(ma.policy, 'hex') as policy_id,
encode(ma.name, 'hex') as asset_name_hex,
convert_from(ma.name, 'UTF8') as asset_name,
ma.fingerprint,
COALESCE(SUM(mto.quantity), 0) as quantity
FROM ma_tx_out mto
JOIN multi_asset ma ON ma.id = mto.ident
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = $1 AND txi.id IS NULL
GROUP BY ma.id, ma.policy, ma.name, ma.fingerprint
HAVING SUM(mto.quantity) > 0
ORDER BY quantity DESC
""", address)
result = {
"address": address,
"tokens": [
{
"policy_id": t["policy_id"],
"asset_name": t["asset_name"] or t["asset_name_hex"],
"asset_name_hex": t["asset_name_hex"],
"fingerprint": t["fingerprint"],
"quantity": int(t["quantity"])
}
for t in tokens
]
}
await set_cached(cache_key, result, CACHE_TTLS["tokens"])
return result
@app.get("/v1/address/{address}/transactions")
async def get_address_transactions(
address: str,
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
order: str = Query("desc", regex="^(asc|desc)$"),
auth: dict = Depends(get_auth_context)
):
"""Get transactions for an address."""
cache_key = f"txs_{address}_{page}_{limit}_{order}"
cached = await get_cached(cache_key)
if cached:
return cached
offset = (page - 1) * limit
order_dir = "DESC" if order == "desc" else "ASC"
async with db_pool.acquire() as conn:
# Count total transactions
count_result = await conn.fetchrow("""
SELECT COUNT(DISTINCT tx.id) as total
FROM tx
JOIN tx_out ON tx_out.tx_id = tx.id
WHERE tx_out.address = $1
UNION
SELECT COUNT(DISTINCT tx.id) as total
FROM tx
JOIN tx_in ON tx_in.tx_in_id = tx.id
JOIN tx_out ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index
WHERE tx_out.address = $1
""", address)
# Get transactions
txs = await conn.fetch(f"""
WITH addr_txs AS (
SELECT DISTINCT tx.id, tx.hash, tx.fee, b.block_no, b.time
FROM tx
JOIN block b ON b.id = tx.block_id
JOIN tx_out ON tx_out.tx_id = tx.id
WHERE tx_out.address = $1
UNION
SELECT DISTINCT tx.id, tx.hash, tx.fee, b.block_no, b.time
FROM tx
JOIN block b ON b.id = tx.block_id
JOIN tx_in ON tx_in.tx_in_id = tx.id
JOIN tx_out ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index
WHERE tx_out.address = $1
)
SELECT * FROM addr_txs
ORDER BY time {order_dir}
LIMIT $2 OFFSET $3
""", address, limit, offset)
result = {
"address": address,
"page": page,
"limit": limit,
"transactions": [
{
"tx_hash": tx["hash"].hex() if isinstance(tx["hash"], bytes) else tx["hash"],
"block_no": tx["block_no"],
"block_time": tx["time"].isoformat() if tx["time"] else None,
"fee": int(tx["fee"]) if tx["fee"] else 0
}
for tx in txs
]
}
await set_cached(cache_key, result, CACHE_TTLS["transactions"])
return result
# ============ Transaction Endpoints ============
@app.get("/v1/tx/{tx_hash}")
async def get_transaction(tx_hash: str, auth: dict = Depends(get_auth_context)):
"""Get transaction details by hash."""
cache_key = f"tx_{tx_hash}"
cached = await get_cached(cache_key)
if cached:
return cached
# Clean up hash (remove 0x prefix if present)
clean_hash = tx_hash.lower().replace("0x", "")
async with db_pool.acquire() as conn:
tx = await conn.fetchrow("""
SELECT tx.id, encode(tx.hash, 'hex') as hash, tx.fee,
b.block_no, b.time, b.epoch_no
FROM tx
JOIN block b ON b.id = tx.block_id
WHERE tx.hash = decode($1, 'hex')
""", clean_hash)
if not tx:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Transaction {tx_hash} not found"})
# Get inputs
inputs = await conn.fetch("""
SELECT txo.address, txo.value,
encode(tx_src.hash, 'hex') as source_tx_hash,
txo.index as source_tx_index
FROM tx_in txi
JOIN tx_out txo ON txo.tx_id = txi.tx_out_id AND txo.index = txi.tx_out_index
JOIN tx tx_src ON tx_src.id = txi.tx_out_id
WHERE txi.tx_in_id = $1
""", tx["id"])
# Get outputs
outputs = await conn.fetch("""
SELECT address, value, index
FROM tx_out
WHERE tx_id = $1
ORDER BY index
""", tx["id"])
# Get metadata
metadata = await conn.fetch("""
SELECT key, json
FROM tx_metadata
WHERE tx_id = $1
""", tx["id"])
result = {
"tx_hash": tx["hash"],
"block_no": tx["block_no"],
"block_time": tx["time"].isoformat() if tx["time"] else None,
"epoch_no": tx["epoch_no"],
"fee": int(tx["fee"]) if tx["fee"] else 0,
"inputs": [
{
"address": i["address"],
"value": int(i["value"]),
"source_tx_hash": i["source_tx_hash"],
"source_tx_index": i["source_tx_index"]
}
for i in inputs
],
"outputs": [
{
"address": o["address"],
"value": int(o["value"]),
"index": o["index"]
}
for o in outputs
],
"metadata": {str(m["key"]): m["json"] for m in metadata} if metadata else None
}
await set_cached(cache_key, result, CACHE_TTLS["tx_details"])
return result
# ============ Asset Endpoints ============
@app.get("/v1/asset/{policy_id}/info")
async def get_asset_info(policy_id: str, auth: dict = Depends(get_auth_context)):
"""Get info about all assets under a policy ID."""
cache_key = f"asset_info_{policy_id}"
cached = await get_cached(cache_key)
if cached:
return cached
clean_policy = policy_id.lower().replace("0x", "")
async with db_pool.acquire() as conn:
assets = await conn.fetch("""
SELECT
encode(ma.name, 'hex') as asset_name_hex,
convert_from(ma.name, 'UTF8') as asset_name,
ma.fingerprint,
(SELECT COALESCE(SUM(quantity), 0) FROM ma_tx_mint WHERE ident = ma.id) as total_minted,
(SELECT COUNT(*) FROM ma_tx_mint WHERE ident = ma.id AND quantity > 0) as mint_count
FROM multi_asset ma
WHERE ma.policy = decode($1, 'hex')
""", clean_policy)
if not assets:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Policy {policy_id} not found"})
result = {
"policy_id": policy_id,
"assets": [
{
"asset_name": a["asset_name"] or a["asset_name_hex"],
"asset_name_hex": a["asset_name_hex"],
"fingerprint": a["fingerprint"],
"total_supply": int(a["total_minted"]) if a["total_minted"] else 0,
"mint_count": a["mint_count"]
}
for a in assets
]
}
await set_cached(cache_key, result, CACHE_TTLS["asset_info"])
return result
@app.get("/v1/asset/{policy_id}/{asset_name}/holders")
async def get_asset_holders(
policy_id: str,
asset_name: str,
limit: int = Query(20, ge=1, le=100),
auth: dict = Depends(get_auth_context)
):
"""Get top holders of a specific asset."""
cache_key = f"holders_{policy_id}_{asset_name}_{limit}"
cached = await get_cached(cache_key)
if cached:
return cached
clean_policy = policy_id.lower().replace("0x", "")
async with db_pool.acquire() as conn:
# Get asset ID
asset = await conn.fetchrow("""
SELECT id FROM multi_asset
WHERE policy = decode($1, 'hex') AND name = decode($2, 'hex')
""", clean_policy, asset_name)
if not asset:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": "Asset not found"})
# Get holders
holders = await conn.fetch("""
SELECT txo.address, SUM(mto.quantity) as quantity
FROM ma_tx_out mto
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE mto.ident = $1 AND txi.id IS NULL
GROUP BY txo.address
HAVING SUM(mto.quantity) > 0
ORDER BY quantity DESC
LIMIT $2
""", asset["id"], limit)
# Count total holders
holder_count = await conn.fetchrow("""
SELECT COUNT(DISTINCT txo.address) as count
FROM ma_tx_out mto
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE mto.ident = $1 AND txi.id IS NULL
GROUP BY txo.address
HAVING SUM(mto.quantity) > 0
""", asset["id"])
result = {
"policy_id": policy_id,
"asset_name": asset_name,
"holder_count": holder_count["count"] if holder_count else 0,
"holders": [
{"address": h["address"], "quantity": int(h["quantity"])}
for h in holders
]
}
await set_cached(cache_key, result, CACHE_TTLS["asset_info"])
return result
# ============ Pool Endpoints ============
@app.get("/v1/pool/{pool_id}/info")
async def get_pool_info(pool_id: str, auth: dict = Depends(get_auth_context)):
"""Get stake pool information."""
cache_key = f"pool_{pool_id}"
cached = await get_cached(cache_key)
if cached:
return cached
async with db_pool.acquire() as conn:
# Try to find pool by bech32 id or hash
pool = await conn.fetchrow("""
SELECT
ph.view as pool_id,
encode(ph.hash_raw, 'hex') as pool_hash,
pod.ticker_name as ticker,
pod.json->>'name' as name,
pod.json->>'description' as description,
pu.pledge,
pu.margin,
(SELECT COALESCE(SUM(amount), 0) FROM epoch_stake
WHERE pool_id = ph.id AND epoch_no = (SELECT MAX(epoch_no) FROM epoch_stake WHERE pool_id = ph.id)) as active_stake,
(SELECT COUNT(*) FROM block WHERE slot_leader_id IN (SELECT id FROM slot_leader WHERE pool_hash_id = ph.id)) as block_count
FROM pool_hash ph
LEFT JOIN pool_offline_data pod ON pod.pool_id = ph.id
LEFT JOIN pool_update pu ON pu.hash_id = ph.id
AND pu.registered_tx_id = (SELECT MAX(registered_tx_id) FROM pool_update WHERE hash_id = ph.id)
WHERE ph.view = $1 OR encode(ph.hash_raw, 'hex') = $1
""", pool_id.lower())
if not pool:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": f"Pool {pool_id} not found"})
result = {
"pool_id": pool["pool_id"],
"pool_hash": pool["pool_hash"],
"ticker": pool["ticker"],
"name": pool["name"],
"description": pool["description"],
"pledge": int(pool["pledge"]) if pool["pledge"] else 0,
"margin": float(pool["margin"]) if pool["margin"] else 0,
"active_stake": int(pool["active_stake"]) if pool["active_stake"] else 0,
"block_count": pool["block_count"] or 0
}
await set_cached(cache_key, result, CACHE_TTLS["pool_info"])
return result
# ============ Admin Endpoints ============
@app.post("/admin/keys")
async def create_api_key(key_data: APIKeyCreate, auth: dict = Depends(require_master_key)):
"""Create a new API key."""
new_key = f"capi_{secrets.token_hex(24)}"
await redis_client.hset(f"apikey:{new_key}", mapping={
"label": key_data.label,
"tier": key_data.tier,
"owner": key_data.owner or "",
"trp_balance": str(key_data.trp_balance or 0),
"created_at": datetime.now(timezone.utc).isoformat()
})
return APIKeyResponse(
key=new_key,
label=key_data.label,
tier=key_data.tier,
owner=key_data.owner,
created_at=datetime.now(timezone.utc).isoformat()
)
@app.delete("/admin/keys/{key}")
async def revoke_api_key(key: str, auth: dict = Depends(require_master_key)):
"""Revoke an API key."""
deleted = await redis_client.delete(f"apikey:{key}")
if not deleted:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": "API key not found"})
return {"status": "revoked", "key": key}
@app.get("/admin/keys")
async def list_api_keys(auth: dict = Depends(require_master_key)):
"""List all API keys."""
keys = []
cursor = 0
while True:
cursor, found_keys = await redis_client.scan(cursor, match="apikey:*", count=100)
for key in found_keys:
data = await redis_client.hgetall(key)
if data:
keys.append({
"key": key.replace("apikey:", ""),
"label": data.get("label"),
"tier": data.get("tier"),
"owner": data.get("owner") or None,
"created_at": data.get("created_at")
})
if cursor == 0:
break
return {"keys": keys, "count": len(keys)}
@app.get("/admin/stats")
async def get_admin_stats(auth: dict = Depends(require_master_key)):
"""Get API usage statistics."""
# Count rate limit keys (active users in current minute)
current_minute = int(time.time() // 60)
active_users = 0
cursor = 0
while True:
cursor, keys = await redis_client.scan(cursor, match=f"ratelimit:*:{current_minute}", count=100)
active_users += len(keys)
if cursor == 0:
break
# Count API keys
api_key_count = 0
cursor = 0
while True:
cursor, keys = await redis_client.scan(cursor, match="apikey:*", count=100)
api_key_count += len(keys)
if cursor == 0:
break
# Get db-sync status
async with db_pool.acquire() as conn:
latest_block = await conn.fetchrow("SELECT block_no, time FROM block ORDER BY id DESC LIMIT 1")
tx_count = await conn.fetchrow("SELECT COUNT(*) as count FROM tx")
return {
"active_users_this_minute": active_users,
"total_api_keys": api_key_count,
"db_sync_block": latest_block["block_no"] if latest_block else 0,
"db_sync_time": latest_block["time"].isoformat() if latest_block and latest_block["time"] else None,
"total_transactions_synced": tx_count["count"] if tx_count else 0
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8765)

6
requirements.txt Normal file
View file

@ -0,0 +1,6 @@
fastapi==0.115.0
uvicorn[standard]==0.30.0
asyncpg==0.30.0
redis==5.0.0
pydantic==2.9.0
python-dotenv==1.0.0