clawdforge/clawdforge/store.py
Kayos dbbead261d v0.2.1: optional per-session model override (Cobb wants Opus on code-work paths)
- CreateSessionRequest gains optional `model` field (validated regex)
- AcpxManager.create + AcpxSession dataclass carry the model
- Subprocess env on both create and turn sets ANTHROPIC_MODEL=<model>
  when set; subprocesses inherit parent env unchanged when NULL
- store.insert_session takes optional model; sessions table grows a
  nullable model column via idempotent ALTER TABLE in Store.__init__
  so live deployments migrate on next boot
- POST /sessions response echoes model so callers can confirm
- session_events 'create' meta records the model

Backward-compatible: omitting model preserves current Sonnet-default
behavior. The 14 SDK Session APIs work unchanged; SDK updates can land
opportunistically when callers need to pin a model. crafting-table's
patcher pins 'opus' in a follow-up commit on the crafting-table side.
2026-04-29 12:35:33 -07:00

377 lines
13 KiB
Python

"""SQLite store for app tokens, run audit log, and (v0.2) ACPX session ledger."""
import json
import sqlite3
import secrets
import time
from contextlib import contextmanager
from pathlib import Path
SCHEMA = """
CREATE TABLE IF NOT EXISTS app_tokens (
name TEXT PRIMARY KEY,
token_hash TEXT NOT NULL UNIQUE,
ip_cidrs TEXT NOT NULL DEFAULT '',
created_at INTEGER NOT NULL,
last_used INTEGER,
enabled INTEGER NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
app_name TEXT NOT NULL,
started_at INTEGER NOT NULL,
duration_ms INTEGER,
model TEXT,
prompt_chars INTEGER,
result_chars INTEGER,
ok INTEGER NOT NULL,
error TEXT,
file_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS runs_app_started ON runs(app_name, started_at);
CREATE TABLE IF NOT EXISTS files (
file_token TEXT PRIMARY KEY,
app_name TEXT NOT NULL,
path TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
-- v0.2: ACPX-backed multi-turn sessions ledger.
-- session_id is the value we pass to acpx as `--name` (a UUID we mint).
-- ACPX persists its own session metadata under ~/.acpx/sessions/*.json keyed by
-- (agentCommand, cwd, name); this table is clawdforge's per-app view of which
-- token spawned which session and when it was last used.
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
app_name TEXT NOT NULL,
agent TEXT NOT NULL DEFAULT 'claude',
cwd TEXT NOT NULL,
created_at INTEGER NOT NULL,
last_turn_at INTEGER,
turn_count INTEGER NOT NULL DEFAULT 0,
closed_at INTEGER,
meta_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_sessions_app_name ON sessions(app_name);
CREATE INDEX IF NOT EXISTS idx_sessions_closed_at ON sessions(closed_at);
-- v0.2: append-only audit of session-scoped events (parallel to `runs`).
CREATE TABLE IF NOT EXISTS session_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
app_name TEXT NOT NULL,
event TEXT NOT NULL,
duration_ms INTEGER,
meta_json TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id)
);
CREATE INDEX IF NOT EXISTS idx_session_events_sid ON session_events(session_id);
CREATE INDEX IF NOT EXISTS idx_session_events_app ON session_events(app_name);
"""
def _hash(token: str) -> str:
import hashlib
return hashlib.sha256(token.encode()).hexdigest()
class Store:
def __init__(self, db_path: str):
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.db_path = db_path
with self._conn() as c:
c.executescript(SCHEMA)
self._migrate_v0_2_1(c)
def _migrate_v0_2_1(self, c):
"""Idempotent additive migrations layered on top of SCHEMA.
Each ALTER TABLE is wrapped in try/except for OperationalError so
re-running on an already-migrated DB is a no-op. New deployments
get the columns via this same path immediately after CREATE TABLE.
"""
# Optional per-session model override (e.g. 'opus' / 'sonnet').
# NULL = let underlying claude CLI default decide. Cobb's intent
# 2026-04-29: crafting-table's patcher pins 'opus' per session so
# code-work prompts get Opus's long-context reasoning while other
# consumers (cauldron) keep using whatever the CLI default is.
try:
c.execute("ALTER TABLE sessions ADD COLUMN model TEXT")
except sqlite3.OperationalError:
pass # column already present — re-run on already-migrated DB
@contextmanager
def _conn(self):
conn = sqlite3.connect(self.db_path, isolation_level=None)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
# --- app tokens ---------------------------------------------------------
def create_token(self, name: str, ip_cidrs: list[str]) -> str:
token = "cf_" + secrets.token_urlsafe(32)
with self._conn() as c:
c.execute(
"INSERT INTO app_tokens (name, token_hash, ip_cidrs, created_at) VALUES (?,?,?,?)",
(name, _hash(token), ",".join(ip_cidrs), int(time.time())),
)
return token
def lookup_token(self, token: str) -> dict | None:
with self._conn() as c:
row = c.execute(
"SELECT name, ip_cidrs, enabled FROM app_tokens WHERE token_hash=?",
(_hash(token),),
).fetchone()
if not row or not row["enabled"]:
return None
c.execute(
"UPDATE app_tokens SET last_used=? WHERE token_hash=?",
(int(time.time()), _hash(token)),
)
return {
"name": row["name"],
"ip_cidrs": [s for s in row["ip_cidrs"].split(",") if s],
}
def list_tokens(self) -> list[dict]:
with self._conn() as c:
rows = c.execute(
"SELECT name, ip_cidrs, created_at, last_used, enabled FROM app_tokens ORDER BY name"
).fetchall()
return [dict(r) for r in rows]
def revoke_token(self, name: str) -> bool:
with self._conn() as c:
cur = c.execute("UPDATE app_tokens SET enabled=0 WHERE name=?", (name,))
return cur.rowcount > 0
# --- runs ---------------------------------------------------------------
def log_run(
self,
app_name: str,
started_at: int,
duration_ms: int,
model: str,
prompt_chars: int,
result_chars: int,
ok: bool,
error: str | None,
file_count: int,
) -> None:
with self._conn() as c:
c.execute(
"INSERT INTO runs (app_name, started_at, duration_ms, model, prompt_chars, result_chars, ok, error, file_count) VALUES (?,?,?,?,?,?,?,?,?)",
(
app_name,
started_at,
duration_ms,
model,
prompt_chars,
result_chars,
1 if ok else 0,
error,
file_count,
),
)
# --- files --------------------------------------------------------------
def register_file(self, app_name: str, path: str, ttl_secs: int) -> str:
token = "ff_" + secrets.token_urlsafe(24)
now = int(time.time())
with self._conn() as c:
c.execute(
"INSERT INTO files (file_token, app_name, path, created_at, expires_at) VALUES (?,?,?,?,?)",
(token, app_name, path, now, now + ttl_secs),
)
return token
def resolve_file(self, file_token: str, app_name: str) -> str | None:
with self._conn() as c:
row = c.execute(
"SELECT path, expires_at FROM files WHERE file_token=? AND app_name=?",
(file_token, app_name),
).fetchone()
if not row:
return None
if int(time.time()) > row["expires_at"]:
return None
return row["path"]
def gc_expired_files(self) -> list[str]:
now = int(time.time())
with self._conn() as c:
rows = c.execute(
"SELECT file_token, path FROM files WHERE expires_at<?", (now,)
).fetchall()
paths = [r["path"] for r in rows]
c.execute("DELETE FROM files WHERE expires_at<?", (now,))
return paths
# --- sessions (v0.2) ---------------------------------------------------
def insert_session(
self,
*,
session_id: str,
app_name: str,
agent: str,
cwd: str,
model: str | None = None,
meta: dict | None = None,
) -> dict:
now = int(time.time())
meta_json = json.dumps(meta) if meta is not None else None
with self._conn() as c:
c.execute(
"INSERT INTO sessions (session_id, app_name, agent, model, cwd, created_at, meta_json) VALUES (?,?,?,?,?,?,?)",
(session_id, app_name, agent, model, cwd, now, meta_json),
)
return {
"session_id": session_id,
"app_name": app_name,
"agent": agent,
"cwd": cwd,
"created_at": now,
"last_turn_at": None,
"turn_count": 0,
"closed_at": None,
"meta": meta,
}
def get_session(self, session_id: str) -> dict | None:
with self._conn() as c:
row = c.execute(
"SELECT * FROM sessions WHERE session_id=?", (session_id,)
).fetchone()
if not row:
return None
d = dict(row)
mj = d.pop("meta_json", None)
d["meta"] = json.loads(mj) if mj else None
return d
def list_sessions_for_app(self, app_name: str, *, include_closed: bool = True) -> list[dict]:
sql = "SELECT * FROM sessions WHERE app_name=?"
params: list = [app_name]
if not include_closed:
sql += " AND closed_at IS NULL"
sql += " ORDER BY created_at DESC"
with self._conn() as c:
rows = c.execute(sql, params).fetchall()
out = []
for r in rows:
d = dict(r)
mj = d.pop("meta_json", None)
d["meta"] = json.loads(mj) if mj else None
out.append(d)
return out
def count_open_sessions(self) -> int:
with self._conn() as c:
r = c.execute("SELECT COUNT(*) AS n FROM sessions WHERE closed_at IS NULL").fetchone()
return int(r["n"]) if r else 0
def mark_session_turn(self, session_id: str, *, now: int | None = None) -> None:
now = now or int(time.time())
with self._conn() as c:
c.execute(
"UPDATE sessions SET last_turn_at=?, turn_count=turn_count+1 WHERE session_id=?",
(now, session_id),
)
def mark_session_closed(self, session_id: str, *, now: int | None = None) -> bool:
"""Returns True if this call was the one that flipped closed_at, False if it was already closed or missing."""
now = now or int(time.time())
with self._conn() as c:
cur = c.execute(
"UPDATE sessions SET closed_at=? WHERE session_id=? AND closed_at IS NULL",
(now, session_id),
)
return cur.rowcount > 0
def hard_delete_session(self, session_id: str) -> bool:
with self._conn() as c:
cur = c.execute("DELETE FROM sessions WHERE session_id=?", (session_id,))
return cur.rowcount > 0
def find_stale_open_sessions(self, *, ttl_cutoff: int) -> list[dict]:
"""Open sessions where (last_turn_at OR created_at) < ttl_cutoff."""
with self._conn() as c:
rows = c.execute(
"""
SELECT * FROM sessions
WHERE closed_at IS NULL
AND (
(last_turn_at IS NOT NULL AND last_turn_at < ?)
OR (last_turn_at IS NULL AND created_at < ?)
)
""",
(ttl_cutoff, ttl_cutoff),
).fetchall()
out = []
for r in rows:
d = dict(r)
mj = d.pop("meta_json", None)
d["meta"] = json.loads(mj) if mj else None
out.append(d)
return out
def find_hard_deletable_sessions(self, *, hard_cutoff: int) -> list[dict]:
"""Closed sessions whose closed_at < hard_cutoff."""
with self._conn() as c:
rows = c.execute(
"SELECT * FROM sessions WHERE closed_at IS NOT NULL AND closed_at < ?",
(hard_cutoff,),
).fetchall()
out = []
for r in rows:
d = dict(r)
mj = d.pop("meta_json", None)
d["meta"] = json.loads(mj) if mj else None
out.append(d)
return out
# --- session_events (v0.2) ---------------------------------------------
def log_session_event(
self,
*,
session_id: str,
app_name: str,
event: str,
duration_ms: int | None = None,
meta: dict | None = None,
) -> None:
with self._conn() as c:
c.execute(
"INSERT INTO session_events (session_id, app_name, event, duration_ms, meta_json, created_at) VALUES (?,?,?,?,?,?)",
(
session_id,
app_name,
event,
duration_ms,
json.dumps(meta) if meta is not None else None,
int(time.time()),
),
)
def list_session_events(self, session_id: str) -> list[dict]:
with self._conn() as c:
rows = c.execute(
"SELECT * FROM session_events WHERE session_id=? ORDER BY id ASC",
(session_id,),
).fetchall()
return [dict(r) for r in rows]