fix: Security hardening pass 3 - body stream limit, challenge rate limit, COSE validation, TRP refresh batching

- Fix #17: Body size middleware reads actual body stream (catches chunked/missing Content-Length)
- Fix #17: Challenge flood prevention - per-address rate limit (5/min) + outstanding limit (10 max)
- Fix #18: COSE algorithm validation (must be EdDSA/-8)
- Fix #18: COSE element type validation (protected, sig must be bytes; key must be 32 bytes)
- Fix #19: TRP refresh uses Redis Set + batched Postgres query (eliminates N+1)
- Fix #20: Pagination page parameter capped at 10000 across all endpoints

Bumped version to 2.3.0
This commit is contained in:
Kayos 2026-03-21 10:34:25 -07:00
parent 038cbbb9c6
commit 11b551b0fe

196
main.py
View file

@ -41,6 +41,14 @@ Security hardening pass 2 (2026-03-21):
- Fix #14: cbor2 bumped to >=5.6.5 (CVE-2024-26134)
- Fix #15: Fixed holder count query (was using GROUP BY + COUNT DISTINCT incorrectly)
- Fix #16: Async lock for protocol params cache to prevent stampede
Security hardening pass 3 (2026-03-21):
- Fix #17: Body size middleware now reads actual body stream (catches chunked/missing Content-Length)
- Fix #17: Challenge flood prevention - per-address rate limit (5/min) + outstanding limit (10 max)
- Fix #18: COSE algorithm validation (must be EdDSA/-8)
- Fix #18: COSE element type validation (protected, sig must be bytes; key must be 32 bytes)
- Fix #19: TRP refresh uses Redis Set + batched Postgres query (no more N+1)
- Fix #20: Pagination page parameter capped at 10000 across all endpoints
"""
import os
@ -240,6 +248,12 @@ def verify_cip8_signature(address: str, nonce: str, signature_hex: str, key_hex:
# Decode the key (should be a 32-byte Ed25519 public key)
key_bytes = bytes.fromhex(key_hex)
# Fix #18: Validate key_bytes type and length
if not isinstance(key_bytes, bytes) or len(key_bytes) != 32:
logger.warning(f"CIP-8 verification failed: invalid key length {len(key_bytes) if isinstance(key_bytes, bytes) else 'non-bytes'}")
return False
vkey = VerificationKey.from_primitive(key_bytes)
# Verify the public key hashes to the address
@ -276,6 +290,27 @@ def verify_cip8_signature(address: str, nonce: str, signature_hex: str, key_hex:
protected, unprotected, payload, sig = cose_sign1
# Fix #18: Validate COSE element types
if not isinstance(protected, bytes) or not isinstance(sig, bytes):
logger.warning("CIP-8 verification failed: invalid COSE_Sign1 element types")
return False
# Fix #18: Decode and validate protected header - must be EdDSA (-8)
try:
protected_decoded = cbor2.loads(protected)
except Exception:
logger.warning("CIP-8 verification failed: could not decode protected header")
return False
if not isinstance(protected_decoded, dict):
logger.warning("CIP-8 verification failed: protected header is not a map")
return False
alg = protected_decoded.get(1) # Key 1 = algorithm in COSE header map
if alg != -8: # -8 = EdDSA per COSE spec
logger.warning(f"CIP-8 verification failed: invalid algorithm {alg}, expected -8 (EdDSA)")
return False
# Fix #12: Reject empty payloads - nonce verification must happen
if not payload:
logger.warning("CIP-8 verification rejected: empty payload")
@ -333,35 +368,75 @@ async def refresh_trp_tiers_task():
async def refresh_all_trp_tiers():
"""Check all TRP-gated keys and update tiers if balance changed."""
"""
Check all TRP-gated keys and update tiers if balance changed.
Fix #19: Uses Redis Set + batched Postgres query to avoid N+1.
"""
if not redis_client or not db_pool:
return
cursor = 0
# Fix #19: Get TRP-gated keys from dedicated set instead of SCAN
key_hashes = await redis_client.smembers("trp_gated_keys")
if not key_hashes:
return
# Collect all keys and their owners
keys_data = []
addresses = []
for key_hash in key_hashes:
data = await redis_client.hgetall(f"apikey:{key_hash}")
if not data:
# Key was deleted but still in set - clean up
await redis_client.srem("trp_gated_keys", key_hash)
continue
owner = data.get("owner")
if owner and owner.startswith("addr"):
keys_data.append({"key_hash": key_hash, "data": data, "owner": owner})
if owner not in addresses:
addresses.append(owner)
if not addresses:
return
# Fix #19: Batch query all TRP balances in one Postgres call
try:
async with db_pool.acquire() as conn:
results = await conn.fetch("""
SELECT txo.address, COALESCE(SUM(mto.quantity), 0) as quantity
FROM ma_tx_out mto
JOIN multi_asset ma ON ma.id = mto.ident
JOIN tx_out txo ON txo.id = mto.tx_out_id
LEFT JOIN tx_in txi ON txi.tx_out_id = txo.tx_id AND txi.tx_out_index = txo.index
WHERE txo.address = ANY($1::text[])
AND ma.policy = decode($2, 'hex')
AND txi.id IS NULL
GROUP BY txo.address
""", addresses, TRP_POLICY_ID)
# Build address -> balance map
balance_map = {r["address"]: int(r["quantity"]) for r in results}
except Exception as e:
logger.error(f"Error batch-querying TRP balances: {e}")
return
# Update tiers for each key
updated = 0
while True:
cursor, keys = await redis_client.scan(cursor, match="apikey:*", count=100)
for key in keys:
data = await redis_client.hgetall(key)
owner = data.get("owner")
if owner and owner.startswith("addr"):
# This is a TRP-gated key, check balance
try:
trp_balance = await get_trp_balance(owner)
new_tier = get_tier_from_trp_balance(trp_balance)
old_tier = data.get("tier", "anonymous")
if new_tier != old_tier:
await redis_client.hset(key, mapping={
"tier": new_tier,
"trp_balance": str(trp_balance)
})
logger.info(f"Updated tier for {owner}: {old_tier} -> {new_tier}")
updated += 1
except Exception as e:
logger.error(f"Error checking TRP balance for {owner}: {e}")
if cursor == 0:
break
for key_info in keys_data:
owner = key_info["owner"]
data = key_info["data"]
key_hash = key_info["key_hash"]
trp_balance = balance_map.get(owner, 0)
new_tier = get_tier_from_trp_balance(trp_balance)
old_tier = data.get("tier", "anonymous")
if new_tier != old_tier:
await redis_client.hset(f"apikey:{key_hash}", mapping={
"tier": new_tier,
"trp_balance": str(trp_balance)
})
logger.info(f"Updated tier for {owner}: {old_tier} -> {new_tier}")
updated += 1
if updated > 0:
logger.info(f"TRP tier refresh complete: {updated} keys updated")
@ -433,7 +508,7 @@ async def lifespan(app: FastAPI):
app = FastAPI(
title="Cardano Chain Data API",
description="REST API for querying Cardano blockchain data via db-sync and cardano-node",
version="2.2.0", # Bumped for security hardening pass 2
version="2.3.0", # Bumped for security hardening pass 3
lifespan=lifespan
)
@ -441,7 +516,10 @@ app = FastAPI(
# ============ Fix #11: Request Body Size Limit Middleware ============
class LimitBodySizeMiddleware(BaseHTTPMiddleware):
"""Limit request body size on tx submit to prevent DoS. Cardano max tx is ~16KB."""
"""
Limit request body size on tx submit to prevent DoS. Cardano max tx is ~16KB.
Fix #17: Actually reads body stream to catch chunked encoding / missing Content-Length.
"""
MAX_TX_SIZE = 65536 # 64KB - generous limit
async def dispatch(self, request: Request, call_next):
@ -452,6 +530,21 @@ class LimitBodySizeMiddleware(BaseHTTPMiddleware):
status_code=413,
content={"error": "payload_too_large", "message": "Transaction exceeds maximum size of 64KB"}
)
# Actually read and limit the body regardless of Content-Length header
# This catches chunked transfer encoding and missing Content-Length
body = await request.body()
if len(body) > self.MAX_TX_SIZE:
return JSONResponse(
status_code=413,
content={"error": "payload_too_large", "message": "Transaction exceeds maximum size of 64KB"}
)
# Cache the body so the route handler can read it (body can only be read once)
async def receive():
return {"type": "http.request", "body": body, "more_body": False}
request._receive = receive
return await call_next(request)
@ -554,6 +647,8 @@ async def get_api_key_info(key: str) -> Optional[dict]:
# Key expired, delete it and return None
logger.info(f"API key expired for owner {data.get('owner', 'unknown')}")
await redis_client.delete(f"apikey:{key_hash}")
# Fix #19: Also remove from TRP-gated keys set
await redis_client.srem("trp_gated_keys", key_hash)
return None
except Exception as e:
logger.error(f"Error parsing expires_at: {e}")
@ -1049,12 +1144,14 @@ async def get_protocol_params(auth: dict = Depends(require_standard_tier)):
# ============ Auth Endpoints (TRP-Gated) ============
@app.post("/v1/auth/challenge")
async def create_auth_challenge(request: AuthChallengeRequest):
async def create_auth_challenge(request: AuthChallengeRequest, req: Request):
"""
Create a challenge nonce for wallet signature verification.
The nonce must be signed with CIP-8 and submitted to /v1/auth/verify.
Fix #17: Per-address rate limit to prevent challenge flood / Redis memory exhaustion.
"""
address = request.address
address = request.address.strip()
# Fix #8: Validate address format
if not validate_address(address):
@ -1063,6 +1160,30 @@ async def create_auth_challenge(request: AuthChallengeRequest):
detail={"error": "invalid_address", "message": "Invalid Cardano address format"}
)
# Fix #17: Per-address challenge rate limit: 5 per minute
challenge_rate_key = f"challenge_rate:{address}:{int(time.time() // 60)}"
current_count = await redis_client.incr(challenge_rate_key)
if current_count == 1:
await redis_client.expire(challenge_rate_key, 60)
if current_count > 5:
raise HTTPException(
status_code=429,
detail={"error": "too_many_challenges", "message": "Too many challenge requests. Try again in a minute."}
)
# Fix #17: Enforce max outstanding challenges per address (prevent accumulation across minutes)
outstanding_key = f"challenge_count:{address}"
outstanding = await redis_client.incr(outstanding_key)
if outstanding == 1:
await redis_client.expire(outstanding_key, 300) # 5 min TTL matching nonce TTL
if outstanding > 10:
# Decrement since we're not actually issuing a challenge
await redis_client.decr(outstanding_key)
raise HTTPException(
status_code=429,
detail={"error": "too_many_challenges", "message": "Too many outstanding challenges for this address."}
)
# Generate nonce
nonce = secrets.token_hex(32)
expires_at = datetime.now(timezone.utc) + timedelta(minutes=5)
@ -1116,6 +1237,10 @@ async def verify_auth(request: AuthVerifyRequest):
detail={"error": "invalid_nonce", "message": "Nonce expired or already used"}
)
# Fix #17: Decrement outstanding challenge counter since nonce is now consumed
outstanding_key = f"challenge_count:{address}"
await redis_client.decr(outstanding_key)
# Verify CIP-8 signature
if not verify_cip8_signature(address, nonce, request.signature, request.key):
raise HTTPException(
@ -1157,6 +1282,9 @@ async def verify_auth(request: AuthVerifyRequest):
"expires_at": expires_at.isoformat() # Fix #3: Add expiry
})
# Fix #19: Track TRP-gated keys in a set for efficient refresh
await redis_client.sadd("trp_gated_keys", key_hash)
logger.info(f"Issued {tier} API key for {address[:20]}... (TRP: {trp_balance}, expires: {expires_at.isoformat()})")
# Return the raw key to the user (only time it's exposed)
@ -1367,7 +1495,7 @@ async def get_address_balance(address: str, auth: dict = Depends(get_auth_contex
@app.get("/v1/address/{address}/tokens")
async def get_address_tokens(
address: str,
page: int = Query(1, ge=1, description="Page number"),
page: int = Query(1, ge=1, le=10000, description="Page number (max 10000)"),
limit: int = Query(100, ge=1, le=1000, description="Results per page (max 1000)"),
auth: dict = Depends(get_auth_context)
):
@ -1442,7 +1570,7 @@ async def get_address_tokens(
@app.get("/v1/address/{address}/transactions")
async def get_address_transactions(
address: str,
page: int = Query(1, ge=1),
page: int = Query(1, ge=1, le=10000, description="Page number (max 10000)"),
limit: int = Query(20, ge=1, le=100),
order: str = Query("desc", regex="^(asc|desc)$"),
auth: dict = Depends(get_auth_context)
@ -1611,7 +1739,7 @@ async def get_transaction(tx_hash: str, auth: dict = Depends(get_auth_context)):
@app.get("/v1/asset/{policy_id}/info")
async def get_asset_info(
policy_id: str,
page: int = Query(1, ge=1, description="Page number"),
page: int = Query(1, ge=1, le=10000, description="Page number (max 10000)"),
limit: int = Query(100, ge=1, le=500, description="Results per page (max 500)"),
auth: dict = Depends(get_auth_context)
):
@ -1839,6 +1967,10 @@ async def revoke_api_key(key: str, auth: dict = Depends(require_master_key)):
deleted = await redis_client.delete(f"apikey:{key_hash}")
if not deleted:
raise HTTPException(status_code=404, detail={"error": "not_found", "message": "API key not found"})
# Fix #19: Remove from TRP-gated keys set
await redis_client.srem("trp_gated_keys", key_hash)
return {"status": "revoked", "key": key[:16] + "..."}