clawdforge/clawdforge/store.py
Kayos 44a8fe743f v0.1 — clawdforge service scaffold
LAN-only HTTP service that runs claude -p subprocess on behalf of Sulkta apps.
Bearer token + IP allowlist gated. SQLite-backed token registry + run audit log.

- POST /run               run a prompt, return parsed result
- POST /files             upload a file, get a file_token to attach to /run
- POST /admin/tokens      mint per-app tokens (admin-bootstrap-token gated)
- GET  /admin/tokens      list, DELETE /admin/tokens/<name>  revoke
- GET  /healthz           liveness + claude --version smoke

Container = node:22 + npm-installed @anthropic-ai/claude-code + uvicorn/FastAPI
wrapper. Persistent volumes for /data (sqlite + run staging) and /root/.claude
(subscription auth — survives container rebuilds; auth via 'docker exec -it
clawdforge claude /login' once). Compose binds 192.168.0.5:8800 only — no
public proxy.

First consumer = cauldron (about to land).
2026-04-28 16:46:44 -07:00

167 lines
5.3 KiB
Python

"""SQLite store for app tokens + run audit log."""
import sqlite3
import secrets
import time
from contextlib import contextmanager
from pathlib import Path
SCHEMA = """
CREATE TABLE IF NOT EXISTS app_tokens (
name TEXT PRIMARY KEY,
token_hash TEXT NOT NULL UNIQUE,
ip_cidrs TEXT NOT NULL DEFAULT '',
created_at INTEGER NOT NULL,
last_used INTEGER,
enabled INTEGER NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
app_name TEXT NOT NULL,
started_at INTEGER NOT NULL,
duration_ms INTEGER,
model TEXT,
prompt_chars INTEGER,
result_chars INTEGER,
ok INTEGER NOT NULL,
error TEXT,
file_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS runs_app_started ON runs(app_name, started_at);
CREATE TABLE IF NOT EXISTS files (
file_token TEXT PRIMARY KEY,
app_name TEXT NOT NULL,
path TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
"""
def _hash(token: str) -> str:
import hashlib
return hashlib.sha256(token.encode()).hexdigest()
class Store:
def __init__(self, db_path: str):
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.db_path = db_path
with self._conn() as c:
c.executescript(SCHEMA)
@contextmanager
def _conn(self):
conn = sqlite3.connect(self.db_path, isolation_level=None)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
# --- app tokens ---------------------------------------------------------
def create_token(self, name: str, ip_cidrs: list[str]) -> str:
token = "cf_" + secrets.token_urlsafe(32)
with self._conn() as c:
c.execute(
"INSERT INTO app_tokens (name, token_hash, ip_cidrs, created_at) VALUES (?,?,?,?)",
(name, _hash(token), ",".join(ip_cidrs), int(time.time())),
)
return token
def lookup_token(self, token: str) -> dict | None:
with self._conn() as c:
row = c.execute(
"SELECT name, ip_cidrs, enabled FROM app_tokens WHERE token_hash=?",
(_hash(token),),
).fetchone()
if not row or not row["enabled"]:
return None
c.execute(
"UPDATE app_tokens SET last_used=? WHERE token_hash=?",
(int(time.time()), _hash(token)),
)
return {
"name": row["name"],
"ip_cidrs": [s for s in row["ip_cidrs"].split(",") if s],
}
def list_tokens(self) -> list[dict]:
with self._conn() as c:
rows = c.execute(
"SELECT name, ip_cidrs, created_at, last_used, enabled FROM app_tokens ORDER BY name"
).fetchall()
return [dict(r) for r in rows]
def revoke_token(self, name: str) -> bool:
with self._conn() as c:
cur = c.execute("UPDATE app_tokens SET enabled=0 WHERE name=?", (name,))
return cur.rowcount > 0
# --- runs ---------------------------------------------------------------
def log_run(
self,
app_name: str,
started_at: int,
duration_ms: int,
model: str,
prompt_chars: int,
result_chars: int,
ok: bool,
error: str | None,
file_count: int,
) -> None:
with self._conn() as c:
c.execute(
"INSERT INTO runs (app_name, started_at, duration_ms, model, prompt_chars, result_chars, ok, error, file_count) VALUES (?,?,?,?,?,?,?,?,?)",
(
app_name,
started_at,
duration_ms,
model,
prompt_chars,
result_chars,
1 if ok else 0,
error,
file_count,
),
)
# --- files --------------------------------------------------------------
def register_file(self, app_name: str, path: str, ttl_secs: int) -> str:
token = "ff_" + secrets.token_urlsafe(24)
now = int(time.time())
with self._conn() as c:
c.execute(
"INSERT INTO files (file_token, app_name, path, created_at, expires_at) VALUES (?,?,?,?,?)",
(token, app_name, path, now, now + ttl_secs),
)
return token
def resolve_file(self, file_token: str, app_name: str) -> str | None:
with self._conn() as c:
row = c.execute(
"SELECT path, expires_at FROM files WHERE file_token=? AND app_name=?",
(file_token, app_name),
).fetchone()
if not row:
return None
if int(time.time()) > row["expires_at"]:
return None
return row["path"]
def gc_expired_files(self) -> list[str]:
now = int(time.time())
with self._conn() as c:
rows = c.execute(
"SELECT file_token, path FROM files WHERE expires_at<?", (now,)
).fetchall()
paths = [r["path"] for r in rows]
c.execute("DELETE FROM files WHERE expires_at<?", (now,))
return paths