v0.1 — clawdforge service scaffold

LAN-only HTTP service that runs claude -p subprocess on behalf of Sulkta apps.
Bearer token + IP allowlist gated. SQLite-backed token registry + run audit log.

- POST /run               run a prompt, return parsed result
- POST /files             upload a file, get a file_token to attach to /run
- POST /admin/tokens      mint per-app tokens (admin-bootstrap-token gated)
- GET  /admin/tokens      list, DELETE /admin/tokens/<name>  revoke
- GET  /healthz           liveness + claude --version smoke

Container = node:22 + npm-installed @anthropic-ai/claude-code + uvicorn/FastAPI
wrapper. Persistent volumes for /data (sqlite + run staging) and /root/.claude
(subscription auth — survives container rebuilds; auth via 'docker exec -it
clawdforge claude /login' once). Compose binds 192.168.0.5:8800 only — no
public proxy.

First consumer = cauldron (about to land).
This commit is contained in:
Kayos 2026-04-28 16:46:44 -07:00
parent a7be5a7702
commit 44a8fe743f
12 changed files with 832 additions and 1 deletions

29
.env.example Normal file
View file

@ -0,0 +1,29 @@
# clawdforge — copy to .env on Lucy at /mnt/cache/appdata/secrets/clawdforge.env
# (chmod 600, root:root)
# Bind
BIND_HOST=0.0.0.0
BIND_PORT=8800
# Bootstrap admin token. Used to mint per-app tokens via /admin/tokens.
# Once the SQLite db has any token, this var becomes a "root override" and
# should be rotated or unset.
ADMIN_BOOTSTRAP_TOKEN=change-me-32-bytes-of-entropy
# IP allowlist applied to ALL requests. CIDR list, comma-separated.
# 172.24.0.0/16 = sulkta bridge (where clawdforge sits with peer apps)
# 172.17.0.0/16 = docker0 default (some legacy apps still here)
# 192.168.0.0/24 = LAN clients
# Loopback always allowed.
ALLOW_CIDRS=172.24.0.0/16,172.17.0.0/16,192.168.0.0/24
# Default claude config (per-request override allowed)
CLAUDE_BIN=claude
DEFAULT_MODEL=sonnet
DEFAULT_TIMEOUT_SECS=120
# Run-staging area inside the container (don't change unless you also change compose mount)
RUNS_DIR=/data/runs
# SQLite db path (don't change unless you also change compose mount)
DB_PATH=/data/clawdforge.db

15
.gitignore vendored Normal file
View file

@ -0,0 +1,15 @@
__pycache__/
*.pyc
.env
.venv/
venv/
*.sqlite
*.db
.pytest_cache/
.mypy_cache/
.ruff_cache/
*.log
.idea/
.vscode/
data/
runs/

35
Dockerfile Normal file
View file

@ -0,0 +1,35 @@
FROM node:22-bookworm-slim
# System deps + Python (claude code is npm; our wrapper is Python)
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 python3-pip python3-venv \
ca-certificates curl git \
&& rm -rf /var/lib/apt/lists/*
# Claude Code CLI
RUN npm install -g @anthropic-ai/claude-code
# Python deps in a venv
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY clawdforge /app/clawdforge
# Persistent volume mount points:
# /data -> sqlite + runs staging
# /root/.claude -> claude code auth (cobb runs `claude /login` once per container)
# /root/.config/claude -> alt config path some claude versions use
RUN mkdir -p /data /root/.claude /root/.config/claude
EXPOSE 8800
CMD ["uvicorn", "clawdforge.server:app", \
"--host", "0.0.0.0", "--port", "8800", \
"--workers", "1", \
"--proxy-headers", \
"--access-log"]

123
README.md
View file

@ -1,3 +1,124 @@
# clawdforge
LAN-only HTTP service that runs claude -p subprocess on behalf of Sulkta apps. Bearer token + IP allowlist gated.
LAN-only HTTP service that runs `claude -p` subprocess calls on behalf of Sulkta apps.
One container holds the Claude Code subscription auth; multiple apps consume via bearer
tokens + IP allowlist.
## Why
- **Auth in one place** — only this container needs to be `claude /login`'d, not every app
- **Smaller app images** — apps stay tiny Python/Go containers, no node/npm/claude-cli
- **Audit log** — every prompt + response chars + duration in one SQLite db
- **Reusable bone** — petalparse, cauldron, johnny5 all consume the same surface
## Surface
```
GET /healthz liveness + claude --version smoke
POST /run run a prompt, return parsed result
POST /files upload a file, get a file_token to pass to /run
POST /admin/tokens mint a per-app token (admin)
GET /admin/tokens list app tokens (admin)
DELETE /admin/tokens/<name> revoke a token (admin)
```
### `POST /run`
```json
{
"prompt": "Sterilize this ingredient line: 'about 2 cups of cooked white rice'",
"model": "sonnet",
"system": "You are a precise recipe parser. Always reply with valid JSON.",
"files": ["ff_..."],
"timeout_secs": 60
}
```
Returns:
```json
{
"ok": true,
"result": { "qty": 2, "unit": "cup", "food": "rice", "note": "cooked, white", "approx": true },
"duration_ms": 4321,
"stop_reason": "end_turn"
}
```
`result` is the inner `{"type":"result","result":"..."}` from `claude -p --output-format json`,
auto-stripped of code fences and JSON-parsed if possible. If the inner is not valid JSON, it's
returned as a string.
### `POST /files`
multipart/form-data, field `file`, optional `ttl_secs` (60..86400, default 3600).
Returns `{"file_token": "ff_...", "ttl_secs": 3600, "size": 12345}`.
Use that token in subsequent `/run` requests to attach the file via `claude -p --files`.
## Auth
Two layers:
1. **IP allowlist** — global CIDR list in `ALLOW_CIDRS` env. Loopback always allowed.
Per-app allowlist optional on top (mint with `ip_cidrs: [...]`).
2. **Bearer token**`Authorization: Bearer cf_<...>` for `/run` and `/files`,
`Authorization: Bearer <ADMIN_BOOTSTRAP_TOKEN>` for `/admin/*`.
Tokens are SHA-256 hashed in SQLite. The plaintext is shown ONCE at create time.
## Deploy
1. SSH to Lucy: `ssh lucy`
2. `mkdir -p /mnt/user/appdata/clawdforge/{data,claude-config,claude-alt-config}`
3. Drop `.env` at `/mnt/cache/appdata/secrets/clawdforge.env` (chmod 600, root:root) — see `.env.example`
4. Clone the repo to `/opt/stacks/clawdforge` (Lucy uses Gitea reverse-tunnel pattern)
5. `cd /opt/stacks/clawdforge && docker compose up -d --build`
6. **Auth Claude CLI** (one-time, persists on volume):
```
docker exec -it clawdforge claude /login
```
Walk through the device-auth flow. Credentials persist at `/root/.claude/` inside the
container, mapped to `/mnt/user/appdata/clawdforge/claude-config/` on host.
7. Smoke:
```
curl http://192.168.0.5:8800/healthz
```
Should report `claude_present: true` + a version string.
8. Mint a token for the first consumer:
```
curl -sS -X POST http://192.168.0.5:8800/admin/tokens \
-H "Authorization: Bearer $ADMIN_BOOTSTRAP_TOKEN" \
-H "Content-Type: application/json" \
-d '{"name":"cauldron","ip_cidrs":["172.24.0.0/16"]}'
```
Save the returned `token` into the consumer's env.
## Client snippet (Python)
```python
import os, requests
CF = "http://192.168.0.5:8800"
TOKEN = os.environ["CLAWDFORGE_TOKEN"]
r = requests.post(
f"{CF}/run",
headers={"Authorization": f"Bearer {TOKEN}"},
json={
"prompt": 'Reply with JSON: {"hello": "world"}',
"model": "sonnet",
"timeout_secs": 30,
},
timeout=60,
)
r.raise_for_status()
print(r.json()["result"]) # {'hello': 'world'}
```
## Notes
- The CLI is `@anthropic-ai/claude-code` (not the Python `anthropic` SDK).
- Default model is `sonnet`; per-request override via `model` field.
- Per-run working directory is staged under `RUNS_DIR` and torn down on exit, so
`claude` can't pollute the container's working tree.
- File uploads are scoped to the uploading app — token A can't reference token B's files.

0
clawdforge/__init__.py Normal file
View file

71
clawdforge/auth.py Normal file
View file

@ -0,0 +1,71 @@
"""Bearer token + IP allowlist enforcement."""
import ipaddress
from fastapi import Header, HTTPException, Request
from .store import Store
def _client_ip(request: Request) -> str:
return request.client.host if request.client else "0.0.0.0"
def ip_in_any(ip_str: str, cidrs: list[str]) -> bool:
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
return False
if ip.is_loopback:
return True
for cidr in cidrs:
try:
if ip in ipaddress.ip_network(cidr, strict=False):
return True
except ValueError:
continue
return False
class Auth:
"""Constructed once at app startup, holds config + store ref."""
def __init__(self, *, store: Store, global_cidrs: list[str], admin_token: str):
self.store = store
self.global_cidrs = global_cidrs
self.admin_token = admin_token
def require_global_ip(self, request: Request) -> None:
ip = _client_ip(request)
if not ip_in_any(ip, self.global_cidrs):
raise HTTPException(403, f"ip not in allowlist: {ip}")
def require_admin(self, request: Request, authorization: str | None) -> None:
self.require_global_ip(request)
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(401, "missing bearer")
token = authorization[7:].strip()
if not _const_eq(token, self.admin_token):
raise HTTPException(403, "admin auth failed")
def require_app(self, request: Request, authorization: str | None) -> dict:
"""Returns {'name': ..., 'ip_cidrs': [...]} on success."""
self.require_global_ip(request)
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(401, "missing bearer")
token = authorization[7:].strip()
rec = self.store.lookup_token(token)
if not rec:
raise HTTPException(403, "unknown or disabled token")
if rec["ip_cidrs"]:
ip = _client_ip(request)
if not ip_in_any(ip, rec["ip_cidrs"]):
raise HTTPException(403, f"ip not in app allowlist: {ip}")
return rec
def _const_eq(a: str, b: str) -> bool:
if len(a) != len(b):
return False
diff = 0
for x, y in zip(a.encode(), b.encode()):
diff |= x ^ y
return diff == 0

34
clawdforge/config.py Normal file
View file

@ -0,0 +1,34 @@
import os
from dataclasses import dataclass
@dataclass(frozen=True)
class Config:
bind_host: str
bind_port: int
admin_bootstrap_token: str
allow_cidrs: tuple[str, ...]
claude_bin: str
default_model: str
default_timeout_secs: int
runs_dir: str
db_path: str
def load() -> Config:
return Config(
bind_host=os.environ.get("BIND_HOST", "0.0.0.0"),
bind_port=int(os.environ.get("BIND_PORT", "8800")),
admin_bootstrap_token=os.environ["ADMIN_BOOTSTRAP_TOKEN"],
allow_cidrs=tuple(
c.strip() for c in os.environ.get("ALLOW_CIDRS", "").split(",") if c.strip()
),
claude_bin=os.environ.get("CLAUDE_BIN", "claude"),
default_model=os.environ.get("DEFAULT_MODEL", "sonnet"),
default_timeout_secs=int(os.environ.get("DEFAULT_TIMEOUT_SECS", "120")),
runs_dir=os.environ.get("RUNS_DIR", "/data/runs"),
db_path=os.environ.get("DB_PATH", "/data/clawdforge.db"),
)

129
clawdforge/runner.py Normal file
View file

@ -0,0 +1,129 @@
"""Wrap `claude -p ... --output-format json` and parse its result."""
import json
import os
import shutil
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
@dataclass
class RunResult:
ok: bool
result: object # parsed JSON or text
raw_stdout: str
raw_stderr: str
duration_ms: int
stop_reason: str | None
error: str | None = None
class Runner:
def __init__(self, *, claude_bin: str, default_model: str, default_timeout: int, runs_dir: str):
self.claude_bin = claude_bin
self.default_model = default_model
self.default_timeout = default_timeout
self.runs_dir = runs_dir
Path(runs_dir).mkdir(parents=True, exist_ok=True)
def run(
self,
*,
prompt: str,
model: str | None = None,
system: str | None = None,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> RunResult:
cmd = [
self.claude_bin,
"-p",
prompt,
"--output-format",
"json",
"--model",
model or self.default_model,
]
if system:
cmd += ["--append-system-prompt", system]
if files:
for f in files:
cmd += ["--files", f]
timeout = timeout_secs or self.default_timeout
# Stage a per-run dir so claude has a clean working directory
run_id = f"{int(time.time()*1000)}-{os.getpid()}"
cwd = Path(self.runs_dir) / run_id
cwd.mkdir(parents=True, exist_ok=True)
started = time.monotonic()
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
cwd=str(cwd),
)
except subprocess.TimeoutExpired as e:
duration_ms = int((time.monotonic() - started) * 1000)
return RunResult(
ok=False,
result=None,
raw_stdout=(e.stdout or b"").decode("utf-8", "replace") if isinstance(e.stdout, bytes) else (e.stdout or ""),
raw_stderr=(e.stderr or b"").decode("utf-8", "replace") if isinstance(e.stderr, bytes) else (e.stderr or ""),
duration_ms=duration_ms,
stop_reason="timeout",
error=f"timeout after {timeout}s",
)
finally:
shutil.rmtree(cwd, ignore_errors=True)
duration_ms = int((time.monotonic() - started) * 1000)
if proc.returncode != 0:
return RunResult(
ok=False,
result=None,
raw_stdout=proc.stdout,
raw_stderr=proc.stderr,
duration_ms=duration_ms,
stop_reason="error",
error=f"claude exit {proc.returncode}",
)
# claude --output-format json wraps in {"type":"result","result":"...","stop_reason":...}
parsed: object
stop_reason: str | None = None
try:
outer = json.loads(proc.stdout)
if isinstance(outer, dict) and "result" in outer:
stop_reason = outer.get("stop_reason") or outer.get("subtype")
inner = (outer["result"] or "").strip()
if inner.startswith("```"):
# strip ```lang fenced block
parts = inner.split("\n", 1)
if len(parts) == 2:
inner = parts[1].rsplit("```", 1)[0].strip()
# Try to JSON-parse the inner; if it's not valid JSON, return as text
try:
parsed = json.loads(inner)
except json.JSONDecodeError:
parsed = inner
else:
parsed = outer
except json.JSONDecodeError:
# Outer wasn't even JSON. Return raw text.
parsed = proc.stdout.strip()
return RunResult(
ok=True,
result=parsed,
raw_stdout=proc.stdout,
raw_stderr=proc.stderr,
duration_ms=duration_ms,
stop_reason=stop_reason,
error=None,
)

202
clawdforge/server.py Normal file
View file

@ -0,0 +1,202 @@
"""FastAPI app exposing /run, /files, /admin/tokens, /healthz."""
import os
import time
from pathlib import Path
from typing import Annotated
from fastapi import FastAPI, Header, HTTPException, Request, UploadFile, File, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from .auth import Auth
from .config import load
from .runner import Runner
from .store import Store
cfg = load()
store = Store(cfg.db_path)
auth = Auth(
store=store,
global_cidrs=list(cfg.allow_cidrs),
admin_token=cfg.admin_bootstrap_token,
)
runner = Runner(
claude_bin=cfg.claude_bin,
default_model=cfg.default_model,
default_timeout=cfg.default_timeout_secs,
runs_dir=cfg.runs_dir,
)
app = FastAPI(title="clawdforge", version="0.1.0")
# ---------- schemas ----------------------------------------------------------
class RunRequest(BaseModel):
prompt: str = Field(min_length=1)
model: str | None = None
system: str | None = None
files: list[str] | None = None
timeout_secs: int | None = Field(default=None, ge=5, le=600)
class TokenCreateRequest(BaseModel):
name: str = Field(min_length=1, max_length=64, pattern=r"^[a-z0-9][a-z0-9_-]*$")
ip_cidrs: list[str] = Field(default_factory=list)
# ---------- endpoints --------------------------------------------------------
@app.get("/healthz")
def healthz(request: Request):
auth.require_global_ip(request)
# Smoke-check that claude binary exists and responds (--version is fast, no auth needed)
import shutil, subprocess
found = shutil.which(cfg.claude_bin) is not None
version = None
if found:
try:
r = subprocess.run([cfg.claude_bin, "--version"], capture_output=True, text=True, timeout=5)
version = (r.stdout or r.stderr or "").strip().splitlines()[0] if (r.stdout or r.stderr) else None
except Exception as e:
version = f"err: {e}"
return {"ok": True, "claude_present": found, "claude_version": version}
@app.post("/run")
def run(
request: Request,
body: RunRequest,
authorization: Annotated[str | None, Header()] = None,
):
rec = auth.require_app(request, authorization)
# Resolve any file_tokens to actual paths owned by this app
file_paths: list[str] = []
if body.files:
for ftoken in body.files:
path = store.resolve_file(ftoken, rec["name"])
if not path:
raise HTTPException(404, f"unknown or expired file token: {ftoken}")
file_paths.append(path)
started_at = int(time.time())
res = runner.run(
prompt=body.prompt,
model=body.model,
system=body.system,
files=file_paths or None,
timeout_secs=body.timeout_secs,
)
result_chars = len(str(res.result)) if res.result is not None else 0
store.log_run(
app_name=rec["name"],
started_at=started_at,
duration_ms=res.duration_ms,
model=body.model or cfg.default_model,
prompt_chars=len(body.prompt),
result_chars=result_chars,
ok=res.ok,
error=res.error,
file_count=len(file_paths),
)
if not res.ok:
return JSONResponse(
status_code=502,
content={
"ok": False,
"error": res.error,
"stderr": res.raw_stderr[-4000:],
"duration_ms": res.duration_ms,
"stop_reason": res.stop_reason,
},
)
return {
"ok": True,
"result": res.result,
"duration_ms": res.duration_ms,
"stop_reason": res.stop_reason,
}
@app.post("/files")
def upload_file(
request: Request,
file: Annotated[UploadFile, File()],
ttl_secs: Annotated[int, Form()] = 3600,
authorization: Annotated[str | None, Header()] = None,
):
rec = auth.require_app(request, authorization)
if ttl_secs < 60 or ttl_secs > 86400:
raise HTTPException(400, "ttl_secs out of range (60..86400)")
safe_name = (file.filename or "upload").replace("/", "_").replace("..", "_")
target_dir = Path(cfg.runs_dir) / "uploads" / rec["name"]
target_dir.mkdir(parents=True, exist_ok=True)
import secrets as _s
suffix = _s.token_hex(8)
target = target_dir / f"{int(time.time())}-{suffix}-{safe_name}"
with target.open("wb") as fh:
while True:
chunk = file.file.read(1024 * 1024)
if not chunk:
break
fh.write(chunk)
file_token = store.register_file(rec["name"], str(target), ttl_secs)
return {"file_token": file_token, "ttl_secs": ttl_secs, "size": target.stat().st_size}
@app.post("/admin/tokens")
def create_token(
request: Request,
body: TokenCreateRequest,
authorization: Annotated[str | None, Header()] = None,
):
auth.require_admin(request, authorization)
try:
token = store.create_token(body.name, body.ip_cidrs)
except Exception as e:
raise HTTPException(409, f"token create failed: {e}")
return {"name": body.name, "token": token, "ip_cidrs": body.ip_cidrs}
@app.get("/admin/tokens")
def list_tokens(
request: Request,
authorization: Annotated[str | None, Header()] = None,
):
auth.require_admin(request, authorization)
return {"tokens": store.list_tokens()}
@app.delete("/admin/tokens/{name}")
def revoke_token(
name: str,
request: Request,
authorization: Annotated[str | None, Header()] = None,
):
auth.require_admin(request, authorization)
if not store.revoke_token(name):
raise HTTPException(404, "no such token")
return {"ok": True}
@app.on_event("startup")
def _startup_gc():
# Best-effort GC of expired staged files at boot
try:
for p in store.gc_expired_files():
try:
os.remove(p)
except FileNotFoundError:
pass
except Exception:
pass

167
clawdforge/store.py Normal file
View file

@ -0,0 +1,167 @@
"""SQLite store for app tokens + run audit log."""
import sqlite3
import secrets
import time
from contextlib import contextmanager
from pathlib import Path
SCHEMA = """
CREATE TABLE IF NOT EXISTS app_tokens (
name TEXT PRIMARY KEY,
token_hash TEXT NOT NULL UNIQUE,
ip_cidrs TEXT NOT NULL DEFAULT '',
created_at INTEGER NOT NULL,
last_used INTEGER,
enabled INTEGER NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
app_name TEXT NOT NULL,
started_at INTEGER NOT NULL,
duration_ms INTEGER,
model TEXT,
prompt_chars INTEGER,
result_chars INTEGER,
ok INTEGER NOT NULL,
error TEXT,
file_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS runs_app_started ON runs(app_name, started_at);
CREATE TABLE IF NOT EXISTS files (
file_token TEXT PRIMARY KEY,
app_name TEXT NOT NULL,
path TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
"""
def _hash(token: str) -> str:
import hashlib
return hashlib.sha256(token.encode()).hexdigest()
class Store:
def __init__(self, db_path: str):
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.db_path = db_path
with self._conn() as c:
c.executescript(SCHEMA)
@contextmanager
def _conn(self):
conn = sqlite3.connect(self.db_path, isolation_level=None)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
# --- app tokens ---------------------------------------------------------
def create_token(self, name: str, ip_cidrs: list[str]) -> str:
token = "cf_" + secrets.token_urlsafe(32)
with self._conn() as c:
c.execute(
"INSERT INTO app_tokens (name, token_hash, ip_cidrs, created_at) VALUES (?,?,?,?)",
(name, _hash(token), ",".join(ip_cidrs), int(time.time())),
)
return token
def lookup_token(self, token: str) -> dict | None:
with self._conn() as c:
row = c.execute(
"SELECT name, ip_cidrs, enabled FROM app_tokens WHERE token_hash=?",
(_hash(token),),
).fetchone()
if not row or not row["enabled"]:
return None
c.execute(
"UPDATE app_tokens SET last_used=? WHERE token_hash=?",
(int(time.time()), _hash(token)),
)
return {
"name": row["name"],
"ip_cidrs": [s for s in row["ip_cidrs"].split(",") if s],
}
def list_tokens(self) -> list[dict]:
with self._conn() as c:
rows = c.execute(
"SELECT name, ip_cidrs, created_at, last_used, enabled FROM app_tokens ORDER BY name"
).fetchall()
return [dict(r) for r in rows]
def revoke_token(self, name: str) -> bool:
with self._conn() as c:
cur = c.execute("UPDATE app_tokens SET enabled=0 WHERE name=?", (name,))
return cur.rowcount > 0
# --- runs ---------------------------------------------------------------
def log_run(
self,
app_name: str,
started_at: int,
duration_ms: int,
model: str,
prompt_chars: int,
result_chars: int,
ok: bool,
error: str | None,
file_count: int,
) -> None:
with self._conn() as c:
c.execute(
"INSERT INTO runs (app_name, started_at, duration_ms, model, prompt_chars, result_chars, ok, error, file_count) VALUES (?,?,?,?,?,?,?,?,?)",
(
app_name,
started_at,
duration_ms,
model,
prompt_chars,
result_chars,
1 if ok else 0,
error,
file_count,
),
)
# --- files --------------------------------------------------------------
def register_file(self, app_name: str, path: str, ttl_secs: int) -> str:
token = "ff_" + secrets.token_urlsafe(24)
now = int(time.time())
with self._conn() as c:
c.execute(
"INSERT INTO files (file_token, app_name, path, created_at, expires_at) VALUES (?,?,?,?,?)",
(token, app_name, path, now, now + ttl_secs),
)
return token
def resolve_file(self, file_token: str, app_name: str) -> str | None:
with self._conn() as c:
row = c.execute(
"SELECT path, expires_at FROM files WHERE file_token=? AND app_name=?",
(file_token, app_name),
).fetchone()
if not row:
return None
if int(time.time()) > row["expires_at"]:
return None
return row["path"]
def gc_expired_files(self) -> list[str]:
now = int(time.time())
with self._conn() as c:
rows = c.execute(
"SELECT file_token, path FROM files WHERE expires_at<?", (now,)
).fetchall()
paths = [r["path"] for r in rows]
c.execute("DELETE FROM files WHERE expires_at<?", (now,))
return paths

24
compose.yml Normal file
View file

@ -0,0 +1,24 @@
services:
clawdforge:
build:
context: .
dockerfile: Dockerfile
image: clawdforge:local
container_name: clawdforge
restart: unless-stopped
env_file:
- /mnt/cache/appdata/secrets/clawdforge.env
volumes:
- /mnt/user/appdata/clawdforge/data:/data
- /mnt/user/appdata/clawdforge/claude-config:/root/.claude
- /mnt/user/appdata/clawdforge/claude-alt-config:/root/.config/claude
ports:
# LAN-only bind. 8800 picked to live near other forge-y services; bump if collides.
- "192.168.0.5:8800:8800"
- "127.0.0.1:8800:8800"
networks:
- sulkta
networks:
sulkta:
external: true

4
requirements.txt Normal file
View file

@ -0,0 +1,4 @@
fastapi==0.115.5
uvicorn[standard]==0.32.1
pydantic==2.9.2
python-multipart==0.0.17