feat: Add node integration, TRP-gated auth, CIP-8 verification

- Node integration endpoints:
  - GET /v1/address/{address}/utxos - query UTxOs directly from node
  - POST /v1/tx/submit - submit signed transactions
  - GET /v1/protocol-params - current epoch protocol parameters

- TRP-gated permissionless API keys:
  - POST /v1/auth/challenge - get nonce for wallet signing
  - POST /v1/auth/verify - verify CIP-8 signature, issue key based on TRP balance
  - POST /v1/auth/refresh - re-check TRP balance and update tier
  - Background task: hourly tier refresh for all TRP-gated keys

- Tier thresholds: 50+ TRP = standard, 500+ TRP = elevated
- TX submit rate limits: anonymous=blocked, standard=2/min, elevated=10/min
- Added pycardano, cbor2, PyNaCl dependencies
- Updated Dockerfile with cardano-cli binary
This commit is contained in:
Kayos 2026-03-21 08:52:46 -07:00
parent 631a0aa2a0
commit 163de03322
3 changed files with 756 additions and 22 deletions

View file

@ -1,6 +1,36 @@
FROM python:3.12-slim
# Install dependencies for cardano-cli
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
ca-certificates \
libsodium23 \
libsecp256k1-1 \
libnuma1 \
&& rm -rf /var/lib/apt/lists/*
# Download and install cardano-cli
# Using the official release from cardano-node 10.4.1 (compatible with current network)
ARG CARDANO_CLI_VERSION=10.4.1
RUN curl -L "https://github.com/IntersectMBO/cardano-node/releases/download/${CARDANO_CLI_VERSION}/cardano-node-${CARDANO_CLI_VERSION}-linux.tar.gz" \
-o /tmp/cardano-node.tar.gz && \
mkdir -p /tmp/cardano && \
tar -xzf /tmp/cardano-node.tar.gz -C /tmp/cardano && \
cp /tmp/cardano/bin/cardano-cli /usr/local/bin/ && \
chmod +x /usr/local/bin/cardano-cli && \
rm -rf /tmp/cardano /tmp/cardano-node.tar.gz
# Verify cardano-cli installation
RUN cardano-cli --version
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run with uvicorn
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8765", "--workers", "2"]

745
main.py
View file

@ -1,18 +1,17 @@
"""
Cardano Chain Data REST API
Sits in front of cardano-db-sync PostgreSQL database.
Includes node integration for UTxO queries, tx submission, and protocol params.
TRP-gated permissionless API keys with CIP-8 wallet signature verification.
Known policy IDs:
- TRP: 9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05
- MAP: 24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c
Future TRP gating design:
- POST /admin/keys/verify-trp check address TRP balance, auto-upgrade tier
TRP Gating Tiers:
- 0 TRP anonymous rate limits (20 req/min)
- 50+ TRP standard tier (100 req/min)
- 500+ TRP elevated tier (1000 req/min)
- Verification: query /v1/address/{addr}/tokens, check TRP policy balance
- Makes API access decentralized and permissionless hold tokens, get access
"""
import os
@ -21,16 +20,21 @@ import hashlib
import secrets
import time
import logging
from datetime import datetime, timezone
import asyncio
import subprocess
import tempfile
import base64
from datetime import datetime, timezone, timedelta
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException, Query, Header, Depends
from fastapi import FastAPI, Request, HTTPException, Query, Header, Depends, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from pydantic import BaseModel, Field
import asyncpg
from asyncpg.exceptions import UndefinedTableError, PostgresError
import redis.asyncio as redis
import cbor2
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@ -45,6 +49,19 @@ DB_PASS = os.getenv("DB_PASS", "")
REDIS_HOST = os.getenv("REDIS_HOST", "redis-api")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
API_MASTER_KEY = os.getenv("API_MASTER_KEY", "")
CARDANO_NODE_SOCKET_PATH = os.getenv("CARDANO_NODE_SOCKET_PATH", "/node-ipc/node.socket")
CARDANO_NETWORK = os.getenv("CARDANO_NETWORK", "mainnet")
# Known policy IDs
TRP_POLICY_ID = "9c4bd4a90cdb73d9ff681215ecf7dea9fb183d916d30487d17098e05"
MAP_POLICY_ID = "24bd9e7b9ae3a61df79eca72fd8355d0f7767e4c55a04a0d919c019c"
# TRP Tier thresholds
TRP_TIERS = {
"elevated": 500,
"standard": 50,
"anonymous": 0
}
# Rate limits (requests per minute)
RATE_LIMITS = {
@ -54,6 +71,14 @@ RATE_LIMITS = {
"master": float("inf")
}
# TX Submit rate limits (per minute)
TX_SUBMIT_LIMITS = {
"anonymous": 0, # blocked
"standard": 2,
"elevated": 10,
"master": float("inf")
}
# Cache TTLs (seconds)
CACHE_TTLS = {
"balance": 60,
@ -63,12 +88,225 @@ CACHE_TTLS = {
"tx_details": 300,
"asset_info": 120,
"pool_info": 120,
"sync_status": 5
"sync_status": 5,
"protocol_params": 300, # 5 min cache for protocol params
"utxos": 10 # Short cache for UTxOs
}
# Global connections
db_pool: Optional[asyncpg.Pool] = None
redis_client: Optional[redis.Redis] = None
protocol_params_cache: dict = {"data": None, "expires": 0}
# ============ Helper Functions ============
def run_cardano_cli(args: list[str], timeout: int = 30) -> tuple[bool, str, str]:
"""Run cardano-cli command and return (success, stdout, stderr)."""
cmd = ["cardano-cli"] + args
env = os.environ.copy()
env["CARDANO_NODE_SOCKET_PATH"] = CARDANO_NODE_SOCKET_PATH
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
env=env
)
return result.returncode == 0, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return False, "", "Command timed out"
except FileNotFoundError:
return False, "", "cardano-cli not found"
except Exception as e:
return False, "", str(e)
def decode_hex_or_base64(data: str) -> bytes:
"""Decode hex or base64 encoded data."""
# Try hex first
try:
if data.startswith("0x"):
return bytes.fromhex(data[2:])
return bytes.fromhex(data)
except ValueError:
pass
# Try base64
try:
return base64.b64decode(data)
except Exception:
pass
raise ValueError("Invalid hex or base64 encoding")
def get_tier_from_trp_balance(balance: int) -> str:
"""Determine tier based on TRP balance."""
if balance >= TRP_TIERS["elevated"]:
return "elevated"
elif balance >= TRP_TIERS["standard"]:
return "standard"
return "anonymous"
# ============ CIP-8 Signature Verification ============
def verify_cip8_signature(address: str, nonce: str, signature_hex: str, key_hex: str) -> bool:
"""
Verify a CIP-8 message signature.
CIP-8 spec: https://cips.cardano.org/cip/CIP-8
The signature is over a COSE_Sign1 structure containing the message.
We use pycardano for verification when available, with fallback to manual verification.
"""
try:
from pycardano import Address, VerificationKey
from pycardano.hash import blake2b_224
# Decode the key (should be a 32-byte Ed25519 public key)
key_bytes = bytes.fromhex(key_hex)
vkey = VerificationKey.from_primitive(key_bytes)
# Verify the public key hashes to the address
addr = Address.from_primitive(address)
# Get the payment credential hash from the address
if addr.payment_part is None:
logger.warning("Address has no payment part")
return False
expected_keyhash = addr.payment_part.payload
# Hash the verification key to get keyhash
actual_keyhash = blake2b_224(key_bytes)
if expected_keyhash != actual_keyhash:
logger.warning("Key hash mismatch")
return False
# Decode the COSE signature
signature_bytes = bytes.fromhex(signature_hex)
# CIP-8 uses COSE_Sign1 structure
# The signature is a CBOR array: [protected_headers, unprotected_headers, payload, signature]
cose_sign1 = cbor2.loads(signature_bytes)
if not isinstance(cose_sign1, (list, tuple)) or len(cose_sign1) != 4:
# Try unwrapping if it's tagged
if hasattr(cose_sign1, 'value'):
cose_sign1 = cose_sign1.value
if not isinstance(cose_sign1, (list, tuple)) or len(cose_sign1) != 4:
logger.warning("Invalid COSE_Sign1 structure")
return False
protected, unprotected, payload, sig = cose_sign1
# The payload should contain our nonce
if payload:
payload_decoded = payload if isinstance(payload, bytes) else bytes(payload)
# Payload might be hex-encoded nonce or raw bytes
try:
if payload_decoded.hex() != nonce and payload_decoded.decode('utf-8') != nonce:
logger.warning("Payload doesn't match nonce")
return False
except:
if payload_decoded.hex() != nonce:
logger.warning("Payload doesn't match nonce (hex check)")
return False
# Build the Sig_structure for verification
# Sig_structure = ["Signature1", protected, external_aad, payload]
external_aad = b""
sig_structure = cbor2.dumps(["Signature1", protected, external_aad, payload or b""])
# Verify the Ed25519 signature
from nacl.signing import VerifyKey
from nacl.exceptions import BadSignature
verify_key = VerifyKey(key_bytes)
try:
verify_key.verify(sig_structure, sig)
return True
except BadSignature:
logger.warning("Bad signature")
return False
except ImportError as e:
logger.error(f"Missing dependency for CIP-8 verification: {e}")
return False
except Exception as e:
logger.error(f"CIP-8 verification error: {e}")
return False
# ============ Lifespan ============
async def refresh_trp_tiers_task():
"""Background task to refresh TRP tiers for all gated keys every hour."""
while True:
try:
await asyncio.sleep(3600) # Run every hour
await refresh_all_trp_tiers()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in TRP tier refresh task: {e}")
async def refresh_all_trp_tiers():
"""Check all TRP-gated keys and update tiers if balance changed."""
if not redis_client or not db_pool:
return
cursor = 0
updated = 0
while True:
cursor, keys = await redis_client.scan(cursor, match="apikey:*", count=100)
for key in keys:
data = await redis_client.hgetall(key)
owner = data.get("owner")
if owner and owner.startswith("addr"):
# This is a TRP-gated key, check balance
try:
trp_balance = await get_trp_balance(owner)
new_tier = get_tier_from_trp_balance(trp_balance)
old_tier = data.get("tier", "anonymous")
if new_tier != old_tier:
await redis_client.hset(key, mapping={
"tier": new_tier,
"trp_balance": str(trp_balance)
})
logger.info(f"Updated tier for {owner}: {old_tier} -> {new_tier}")
updated += 1
except Exception as e:
logger.error(f"Error checking TRP balance for {owner}: {e}")
if cursor == 0:
break
if updated > 0:
logger.info(f"TRP tier refresh complete: {updated} keys updated")
async def get_trp_balance(address: str) -> int:
"""Get TRP token balance for an address from db-sync."""
async with db_pool.acquire() as conn:
result = await conn.fetchrow("""
SELECT COALESCE(SUM(mto.quantity), 0) as quantity
FROM ma_tx_out mto
JOIN multi_asset ma ON ma.id = mto.ident
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = $1
AND ma.policy = decode($2, 'hex')
AND txi.id IS NULL
""", address, TRP_POLICY_ID)
return int(result["quantity"]) if result else 0
@asynccontextmanager
@ -92,10 +330,25 @@ async def lifespan(app: FastAPI):
logger.info(f"Connecting to Redis at {REDIS_HOST}:{REDIS_PORT}")
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
# Check node socket
if os.path.exists(CARDANO_NODE_SOCKET_PATH):
logger.info(f"Cardano node socket found at {CARDANO_NODE_SOCKET_PATH}")
else:
logger.warning(f"Cardano node socket NOT found at {CARDANO_NODE_SOCKET_PATH} - node endpoints will fail")
# Start background task for TRP tier refresh
tier_refresh_task = asyncio.create_task(refresh_trp_tiers_task())
logger.info("Cardano API started successfully")
yield
# Cleanup
tier_refresh_task.cancel()
try:
await tier_refresh_task
except asyncio.CancelledError:
pass
if db_pool:
await db_pool.close()
if redis_client:
@ -104,8 +357,8 @@ async def lifespan(app: FastAPI):
app = FastAPI(
title="Cardano Chain Data API",
description="REST API for querying Cardano blockchain data via db-sync",
version="1.0.0",
description="REST API for querying Cardano blockchain data via db-sync and cardano-node",
version="2.0.0",
lifespan=lifespan
)
@ -149,6 +402,37 @@ class APIKeyResponse(BaseModel):
created_at: str
class AuthChallengeRequest(BaseModel):
address: str = Field(..., description="Cardano address (bech32)")
class AuthChallengeResponse(BaseModel):
nonce: str
expires_at: str
class AuthVerifyRequest(BaseModel):
address: str = Field(..., description="Cardano address (bech32)")
nonce: str = Field(..., description="Nonce from challenge")
signature: str = Field(..., description="CIP-8 signature (hex)")
key: str = Field(..., description="Public key used for signing (hex)")
class AuthVerifyResponse(BaseModel):
api_key: str
tier: str
trp_balance: int
class TxSubmitRequest(BaseModel):
tx: str = Field(..., description="Signed transaction in hex or base64 CBOR encoding")
class TxSubmitResponse(BaseModel):
tx_hash: str
message: str
# ============ Helpers ============
async def get_api_key_info(key: str) -> Optional[dict]:
@ -159,13 +443,18 @@ async def get_api_key_info(key: str) -> Optional[dict]:
return data if data else None
async def check_rate_limit(identifier: str, tier: str) -> tuple[bool, int]:
async def check_rate_limit(identifier: str, tier: str, limit_type: str = "general") -> tuple[bool, int]:
"""Check rate limit. Returns (allowed, retry_after_seconds)."""
if tier == "master":
return True, 0
limit = RATE_LIMITS.get(tier, RATE_LIMITS["anonymous"])
key = f"ratelimit:{identifier}:{int(time.time() // 60)}"
limits = TX_SUBMIT_LIMITS if limit_type == "tx_submit" else RATE_LIMITS
limit = limits.get(tier, limits.get("anonymous", 0))
if limit == 0:
return False, 0 # Blocked entirely
key = f"ratelimit:{limit_type}:{identifier}:{int(time.time() // 60)}"
current = await redis_client.incr(key)
if current == 1:
@ -217,7 +506,8 @@ async def get_auth_context(
return {
"tier": key_info.get("tier", "standard"),
"identifier": key,
"label": key_info.get("label", "unknown")
"label": key_info.get("label", "unknown"),
"owner": key_info.get("owner")
}
return {"tier": "anonymous", "identifier": client_ip, "label": None}
@ -263,7 +553,7 @@ async def rate_limit_middleware(request: Request, call_next):
identifier = key
label = key_info.get("label", "unknown")
# Check rate limit
# Check rate limit (general)
allowed, retry_after = await check_rate_limit(identifier, tier)
if not allowed:
logger.warning(f"Rate limit exceeded: {identifier} ({tier})")
@ -311,18 +601,413 @@ async def get_sync_status(auth: dict = Depends(get_auth_context)):
if not db_tip:
return {"error": "db_sync_not_ready", "message": "No blocks synced yet"}
# Check node tip for comparison
node_tip = None
success, stdout, stderr = run_cardano_cli(["query", "tip", "--mainnet"])
if success:
try:
node_tip = json.loads(stdout)
except:
pass
result = {
"db_sync_tip": db_tip["block_no"],
"epoch_no": db_tip["epoch_no"],
"slot_no": db_tip["slot_no"],
"last_block_time": db_tip["time"].isoformat() if db_tip["time"] else None,
"is_syncing": True # Assume syncing unless we have better info
"is_syncing": True, # Assume syncing unless we have better info
"node_tip": node_tip.get("block") if node_tip else None,
"node_connected": node_tip is not None
}
# If we have both tips, calculate sync percentage
if node_tip and db_tip["block_no"]:
node_block = node_tip.get("block", 0)
if node_block > 0:
sync_pct = (db_tip["block_no"] / node_block) * 100
result["sync_percentage"] = round(sync_pct, 2)
result["is_syncing"] = sync_pct < 99.9
await set_cached(cache_key, result, CACHE_TTLS["sync_status"])
return result
# ============ Node Integration Endpoints ============
@app.get("/v1/address/{address}/utxos")
async def get_address_utxos(address: str, auth: dict = Depends(get_auth_context)):
"""
Query UTxOs for an address directly from the node.
Faster than db-sync for current unspent outputs.
"""
cache_key = f"utxos_{address}"
cached = await get_cached(cache_key)
if cached:
return cached
# Query node directly using cardano-cli
success, stdout, stderr = run_cardano_cli([
"query", "utxo",
"--address", address,
"--mainnet",
"--output-json"
])
if not success:
if "Network.Socket.connect" in stderr or "does not exist" in stderr:
raise HTTPException(
status_code=503,
detail={"error": "node_unavailable", "message": "Cardano node not available"}
)
raise HTTPException(
status_code=500,
detail={"error": "node_error", "message": stderr}
)
try:
utxo_data = json.loads(stdout)
except json.JSONDecodeError:
raise HTTPException(
status_code=500,
detail={"error": "parse_error", "message": "Failed to parse node response"}
)
# Transform to our API format
utxos = []
total_lovelace = 0
for utxo_id, utxo_info in utxo_data.items():
tx_hash, tx_index = utxo_id.split("#")
lovelace = utxo_info.get("value", {}).get("lovelace", 0)
total_lovelace += lovelace
tokens = []
for policy_id, assets in utxo_info.get("value", {}).items():
if policy_id == "lovelace":
continue
if isinstance(assets, dict):
for asset_name, quantity in assets.items():
tokens.append({
"policy_id": policy_id,
"asset_name": asset_name,
"quantity": quantity
})
utxos.append({
"tx_hash": tx_hash,
"tx_index": int(tx_index),
"lovelace": lovelace,
"tokens": tokens
})
result = {
"address": address,
"utxo_count": len(utxos),
"total_lovelace": total_lovelace,
"total_ada": total_lovelace / 1_000_000,
"utxos": utxos
}
await set_cached(cache_key, result, CACHE_TTLS["utxos"])
return result
@app.post("/v1/tx/submit")
async def submit_transaction(
request: TxSubmitRequest,
auth: dict = Depends(get_auth_context)
):
"""
Submit a signed transaction to the network.
Accepts hex or base64 encoded CBOR transaction.
"""
# Check tx submission rate limit
allowed, retry_after = await check_rate_limit(auth["identifier"], auth["tier"], "tx_submit")
if not allowed:
if auth["tier"] == "anonymous":
raise HTTPException(
status_code=403,
detail={"error": "forbidden", "message": "Transaction submission requires an API key"}
)
raise HTTPException(
status_code=429,
detail={"error": "rate_limit_exceeded", "retry_after": retry_after}
)
# Decode the transaction
try:
tx_bytes = decode_hex_or_base64(request.tx)
except ValueError as e:
raise HTTPException(
status_code=400,
detail={"error": "invalid_encoding", "message": str(e)}
)
# Write to temp file for cardano-cli
with tempfile.NamedTemporaryFile(suffix=".signed", delete=False) as f:
f.write(tx_bytes)
tx_file = f.name
try:
# Submit via cardano-cli
success, stdout, stderr = run_cardano_cli([
"transaction", "submit",
"--tx-file", tx_file,
"--mainnet"
], timeout=60)
if not success:
if "Network.Socket.connect" in stderr or "does not exist" in stderr:
raise HTTPException(
status_code=503,
detail={"error": "node_unavailable", "message": "Cardano node not available"}
)
# Parse common errors
if "OutsideValidityIntervalUTxO" in stderr:
raise HTTPException(
status_code=400,
detail={"error": "validity_interval", "message": "Transaction validity interval expired"}
)
if "BadInputsUTxO" in stderr:
raise HTTPException(
status_code=400,
detail={"error": "bad_inputs", "message": "One or more inputs already spent"}
)
if "ValueNotConservedUTxO" in stderr:
raise HTTPException(
status_code=400,
detail={"error": "value_not_conserved", "message": "Input/output value mismatch"}
)
raise HTTPException(
status_code=400,
detail={"error": "submit_failed", "message": stderr.strip()}
)
# Calculate tx hash from the CBOR
# The hash is blake2b-256 of the transaction body
tx_hash = hashlib.blake2b(tx_bytes, digest_size=32).hexdigest()
# Try to get actual hash from output
if "Transaction successfully submitted" in stdout or success:
# cardano-cli doesn't output the hash, but we calculated it
pass
logger.info(f"Transaction submitted: {tx_hash[:16]}... by {auth['identifier']}")
return TxSubmitResponse(
tx_hash=tx_hash,
message="Transaction successfully submitted"
)
finally:
# Clean up temp file
try:
os.unlink(tx_file)
except:
pass
@app.get("/v1/protocol-params")
async def get_protocol_params(auth: dict = Depends(get_auth_context)):
"""
Get current epoch protocol parameters from the node.
Cached for 5 minutes.
"""
global protocol_params_cache
# Check cache
if protocol_params_cache["data"] and protocol_params_cache["expires"] > time.time():
return protocol_params_cache["data"]
# Query from node
success, stdout, stderr = run_cardano_cli([
"query", "protocol-parameters",
"--mainnet"
])
if not success:
if "Network.Socket.connect" in stderr or "does not exist" in stderr:
raise HTTPException(
status_code=503,
detail={"error": "node_unavailable", "message": "Cardano node not available"}
)
raise HTTPException(
status_code=500,
detail={"error": "node_error", "message": stderr}
)
try:
params = json.loads(stdout)
except json.JSONDecodeError:
raise HTTPException(
status_code=500,
detail={"error": "parse_error", "message": "Failed to parse protocol parameters"}
)
# Cache for 5 minutes
protocol_params_cache = {
"data": params,
"expires": time.time() + CACHE_TTLS["protocol_params"]
}
return params
# ============ Auth Endpoints (TRP-Gated) ============
@app.post("/v1/auth/challenge")
async def create_auth_challenge(request: AuthChallengeRequest):
"""
Create a challenge nonce for wallet signature verification.
The nonce must be signed with CIP-8 and submitted to /v1/auth/verify.
"""
address = request.address
# Validate address format
if not address.startswith(("addr1", "addr_test1")):
raise HTTPException(
status_code=400,
detail={"error": "invalid_address", "message": "Invalid Cardano address format"}
)
# Generate nonce
nonce = secrets.token_hex(32)
expires_at = datetime.now(timezone.utc) + timedelta(minutes=5)
# Store in Redis with TTL
await redis_client.setex(
f"auth_challenge:{address}:{nonce}",
300, # 5 minutes TTL
json.dumps({"address": address, "created_at": datetime.now(timezone.utc).isoformat()})
)
return AuthChallengeResponse(
nonce=nonce,
expires_at=expires_at.isoformat()
)
@app.post("/v1/auth/verify")
async def verify_auth(request: AuthVerifyRequest):
"""
Verify CIP-8 signature and issue API key based on TRP balance.
Tiers:
- 0 TRP: anonymous (no key issued)
- 50+ TRP: standard (100 req/min)
- 500+ TRP: elevated (1000 req/min)
"""
address = request.address
nonce = request.nonce
# Check nonce exists and hasn't expired
challenge_key = f"auth_challenge:{address}:{nonce}"
challenge_data = await redis_client.get(challenge_key)
if not challenge_data:
raise HTTPException(
status_code=400,
detail={"error": "invalid_nonce", "message": "Nonce expired or invalid"}
)
# Delete nonce (one-time use)
await redis_client.delete(challenge_key)
# Verify CIP-8 signature
if not verify_cip8_signature(address, nonce, request.signature, request.key):
raise HTTPException(
status_code=401,
detail={"error": "invalid_signature", "message": "Signature verification failed"}
)
# Get TRP balance
trp_balance = await get_trp_balance(address)
tier = get_tier_from_trp_balance(trp_balance)
# Don't issue key for anonymous tier
if tier == "anonymous":
raise HTTPException(
status_code=403,
detail={
"error": "insufficient_trp",
"message": f"Minimum {TRP_TIERS['standard']} TRP required for API key",
"trp_balance": trp_balance,
"required": TRP_TIERS["standard"]
}
)
# Generate API key
new_key = f"capi_{secrets.token_hex(24)}"
# Store key info
await redis_client.hset(f"apikey:{new_key}", mapping={
"label": f"TRP-gated:{address[:20]}...",
"tier": tier,
"owner": address,
"trp_balance": str(trp_balance),
"created_at": datetime.now(timezone.utc).isoformat()
})
logger.info(f"Issued {tier} API key for {address[:20]}... (TRP: {trp_balance})")
return AuthVerifyResponse(
api_key=new_key,
tier=tier,
trp_balance=trp_balance
)
@app.post("/v1/auth/refresh")
async def refresh_auth(
x_api_key: str = Header(..., description="API key to refresh"),
auth: dict = Depends(get_auth_context)
):
"""
Re-check TRP balance and upgrade/downgrade tier for an existing key.
"""
if auth["tier"] == "anonymous":
raise HTTPException(
status_code=401,
detail={"error": "unauthorized", "message": "Valid API key required"}
)
if auth["tier"] == "master":
raise HTTPException(
status_code=400,
detail={"error": "invalid_request", "message": "Cannot refresh master key"}
)
owner = auth.get("owner")
if not owner:
raise HTTPException(
status_code=400,
detail={"error": "not_trp_gated", "message": "This key is not TRP-gated"}
)
# Get current TRP balance
trp_balance = await get_trp_balance(owner)
new_tier = get_tier_from_trp_balance(trp_balance)
old_tier = auth["tier"]
# Update key info
await redis_client.hset(f"apikey:{x_api_key}", mapping={
"tier": new_tier,
"trp_balance": str(trp_balance)
})
tier_changed = new_tier != old_tier
message = f"Tier {'upgraded' if new_tier > old_tier else 'downgraded'} from {old_tier} to {new_tier}" if tier_changed else "Tier unchanged"
return {
"api_key": x_api_key,
"tier": new_tier,
"previous_tier": old_tier,
"trp_balance": trp_balance,
"tier_changed": tier_changed,
"message": message
}
# ============ Block Endpoints ============
@app.get("/v1/block/latest")
@ -853,6 +1538,7 @@ async def list_api_keys(auth: dict = Depends(require_master_key)):
"label": data.get("label"),
"tier": data.get("tier"),
"owner": data.get("owner") or None,
"trp_balance": int(data.get("trp_balance", 0)),
"created_at": data.get("created_at")
})
if cursor == 0:
@ -860,6 +1546,13 @@ async def list_api_keys(auth: dict = Depends(require_master_key)):
return {"keys": keys, "count": len(keys)}
@app.post("/admin/refresh-tiers")
async def admin_refresh_tiers(auth: dict = Depends(require_master_key)):
"""Manually trigger TRP tier refresh for all keys."""
await refresh_all_trp_tiers()
return {"status": "ok", "message": "TRP tier refresh completed"}
@app.get("/admin/stats")
async def get_admin_stats(auth: dict = Depends(require_master_key)):
"""Get API usage statistics."""
@ -868,17 +1561,21 @@ async def get_admin_stats(auth: dict = Depends(require_master_key)):
active_users = 0
cursor = 0
while True:
cursor, keys = await redis_client.scan(cursor, match=f"ratelimit:*:{current_minute}", count=100)
cursor, keys = await redis_client.scan(cursor, match=f"ratelimit:general:*:{current_minute}", count=100)
active_users += len(keys)
if cursor == 0:
break
# Count API keys
api_key_count = 0
# Count API keys by tier
api_key_counts = {"standard": 0, "elevated": 0, "total": 0}
cursor = 0
while True:
cursor, keys = await redis_client.scan(cursor, match="apikey:*", count=100)
api_key_count += len(keys)
for key in keys:
tier = await redis_client.hget(key, "tier")
api_key_counts["total"] += 1
if tier in api_key_counts:
api_key_counts[tier] += 1
if cursor == 0:
break
@ -887,12 +1584,16 @@ async def get_admin_stats(auth: dict = Depends(require_master_key)):
latest_block = await conn.fetchrow("SELECT block_no, time FROM block ORDER BY id DESC LIMIT 1")
tx_count = await conn.fetchrow("SELECT COUNT(*) as count FROM tx")
# Check node status
node_connected = os.path.exists(CARDANO_NODE_SOCKET_PATH)
return {
"active_users_this_minute": active_users,
"total_api_keys": api_key_count,
"api_keys": api_key_counts,
"db_sync_block": latest_block["block_no"] if latest_block else 0,
"db_sync_time": latest_block["time"].isoformat() if latest_block and latest_block["time"] else None,
"total_transactions_synced": tx_count["count"] if tx_count else 0
"total_transactions_synced": tx_count["count"] if tx_count else 0,
"node_socket_exists": node_connected
}

View file

@ -4,3 +4,6 @@ asyncpg==0.30.0
redis==5.0.0
pydantic==2.9.0
python-dotenv==1.0.0
pycardano==0.11.0
cbor2==5.6.0
PyNaCl==1.5.0