crafting-table/crafting_table/patcher.py
Cobb Hayes b335405c02 Public-flip audit: generalize internal hosts/paths + drop Sulkta-internal refs
URLs, mount paths, and LAN host bindings parameterized via env or relative paths
so the repo stands up from a clean clone anywhere. Drop cross-codebase refs
("mirrors clawdforge's pattern"), Sulkta-Coop client/merchant test fixtures,
and audit-changelog scaffolding from comments. README terser, technical content
preserved.
2026-05-27 11:25:47 -07:00

1241 lines
44 KiB
Python

"""Autonomous patch loop — wave 3 / step 9.
Lifecycle, end-to-end:
1. A job finishes with one or more `actionable` findings (lint with
file/line, cve with a known fix-version). The runner's post-job hook
calls :meth:`Patcher.maybe_draft_for_job`.
2. For each candidate finding (highest severity first, capped at
`max_attempts_per_finding`):
a. Pull surrounding source from the project's bare-clone-backed
worktree (`±20 lines` around `finding.line`).
b. Open a clawdforge session via ``POST /sessions`` with `agent="claude"`
and metadata identifying the job + finding.
c. Send one turn with a structured prompt; expect a JSON object
``{"diff": ..., "explanation": ..., "confidence": ...}`` back.
d. Apply the diff in a fresh worktree on a new branch
``crafting-table/auto/<job_id>-<finding_id>``. Use
``git apply --check`` first; failure → status=``apply_failed``.
e. Re-run the failing recipe on the patched worktree (the *verify*
step). Failure → status=``verify_failed``.
f. Commit + push the branch to origin.
g. Open a Gitea PR (``POST /api/v1/repos/<owner>/<repo>/pulls``) with
title ``[crafting-table] auto-patch <finding.code>``.
3. Persist a row in ``patch_attempts`` regardless of which step failed —
so the digest can surface "we tried; it didn't work" honestly.
4. Always close the clawdforge session in a ``finally``.
**Verification cost**: re-running the recipe on the patched worktree is
the only safety net. For a recipe with 20-minute build the verify step
DOUBLES the latency. Recommend ``notify.auto_patch=true`` only on
projects where the audit/test recipe is <5min, OR the operator accepts
the latency. v0.2 candidate: a "fast verify" mode that re-runs only the
specific lint that fired, not the whole recipe.
Network calls go through a tiny inline ``httpx`` wrapper instead of
the full clawdforge SDK — keeps the dep surface small and the wire
shape obvious.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import shutil
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable, Literal
from urllib.parse import urlparse
import httpx
from .db import DB
from .workspace import WorkspaceManager
log = logging.getLogger("crafting_table.patcher")
PatchStatus = Literal[
"drafted",
"apply_failed",
"verify_failed",
"pushed",
"pr_opened",
"max_attempts_exceeded",
"failed",
]
# Findings of these kinds are eligible for auto-patch in v0.1. test_fail is
# NOT in here — too brittle for v0.1, lands in v0.2 with deeper context.
_FIXABLE_KINDS = {"lint", "cve"}
@dataclass(frozen=True)
class PatcherConfig:
"""Configuration for the autonomous patch loop.
All fields can be sourced from environment variables — see
:meth:`from_env` for the canonical mapping. The fields without env
backing (`max_attempts_per_finding`, `auto_patch_branch_prefix`) are
knobs that practically never change at deploy time.
"""
clawdforge_base_url: str
clawdforge_token: str
gitea_base_url: str
gitea_token: str
max_attempts_per_finding: int = 3
auto_patch_branch_prefix: str = "crafting-table/auto/"
# Bound the verify recipe so a runaway patched recipe doesn't tie the
# patcher up forever. Falls back to the original subproject's
# timeout_secs when None.
verify_timeout_secs: int | None = None
# HTTP timeout margin — clawdforge adds an internal margin, but we cap
# transport-level too for hung connections.
http_timeout_secs: int = 600
# Model passed to clawdforge on session create. Pinned to "opus" by
# default — code-work prompts (read finding context, write a unified
# diff that doesn't break the verify recipe) reward Opus's longer
# context + deeper reasoning. Override via CRAFTING_PATCHER_MODEL env
# if you want sonnet for cost reasons.
model: str = "opus"
@classmethod
def from_env(cls, env: dict[str, str] | None = None) -> "PatcherConfig | None":
"""Return a config populated from CRAFTING_CLAWDFORGE_* /
CRAFTING_GITEA_* env vars. Returns None if any required var is
missing — caller treats that as "patcher disabled."
The reason this is a classmethod (not a free fn) is so tests can
construct a config directly without needing every env var, while
production reads from the process environment.
"""
import os
e = env if env is not None else dict(os.environ)
cf_url = e.get("CRAFTING_CLAWDFORGE_URL", "").strip()
cf_tok = e.get("CRAFTING_CLAWDFORGE_TOKEN", "").strip()
gt_url = e.get("CRAFTING_GITEA_URL", "").strip()
gt_tok = e.get("CRAFTING_GITEA_TOKEN", "").strip()
if not (cf_url and cf_tok and gt_url and gt_tok):
return None
return cls(
clawdforge_base_url=cf_url.rstrip("/"),
clawdforge_token=cf_tok,
gitea_base_url=gt_url.rstrip("/"),
gitea_token=gt_tok,
max_attempts_per_finding=int(e.get("CRAFTING_PATCHER_MAX_ATTEMPTS", "3")),
auto_patch_branch_prefix=e.get(
"CRAFTING_PATCHER_BRANCH_PREFIX", "crafting-table/auto/"
),
model=e.get("CRAFTING_PATCHER_MODEL", "opus"),
)
@dataclass
class PatchAttempt:
"""Result of one patch-loop pass over a finding.
Mirrors the columns in the ``patch_attempts`` table. ``status`` is the
coarsest signal — ``pr_opened`` is a full success; everything else is
some kind of failure with the diagnosis carried in ``error``.
"""
finding_id: int
job_id: str
project_name: str
attempt_number: int
status: PatchStatus
branch_name: str | None = None
pr_url: str | None = None
diff_excerpt: str | None = None
session_id: str | None = None
error: str | None = None
id: int | None = None # populated after DB persist
def findings_were_actionable(findings: Iterable[dict]) -> bool:
"""Return True if at least one finding is fixable by the v0.1 loop.
Rules:
- kind ``lint`` requires a file + line (so we can extract context).
- kind ``cve`` is fixable when a fix is at least suggested (we trust
the parser's ``suggested_fix`` text — clippy etc. set it when they
can; cargo-audit's ``fixed_in`` lands in suggested_fix via the
Rust parser).
- kind ``test_fail`` is NOT actionable in v0.1 (too brittle, no
reliable single-line fix locator).
"""
for f in findings:
if not isinstance(f, dict):
continue
kind = f.get("kind")
if kind == "lint" and f.get("file") and f.get("line"):
return True
if kind == "cve" and (f.get("suggested_fix") or f.get("code")):
return True
return False
# --- clawdforge wire wrapper ------------------------------------------------
class ClawdforgeClient:
"""Tiny async httpx wrapper around the clawdforge sessions API.
We deliberately avoid the full clawdforge SDK because:
- The SDK is sync (``requests``-based); we'd have to wrap every call
in ``asyncio.to_thread`` anyway.
- Pip-installing the SDK from a sibling LAN repo at runtime is
brittle; this wrapper is ~50 lines, matches the wire shape exactly,
and lives next to its consumer.
Endpoints used:
- ``POST /sessions`` → create
- ``POST /sessions/{id}/turn`` → one turn
- ``DELETE /sessions/{id}`` → close (idempotent)
"""
def __init__(self, base_url: str, token: str, *, timeout_secs: int = 600):
self.base_url = base_url.rstrip("/")
self.token = token
self.timeout_secs = timeout_secs
@property
def _headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
}
async def create_session(
self,
*,
agent: str = "claude",
model: str | None = None,
meta: dict[str, Any] | None = None,
) -> dict[str, Any]:
body: dict[str, Any] = {"agent": agent}
if model is not None:
body["model"] = model
if meta is not None:
body["meta"] = meta
async with httpx.AsyncClient(timeout=self.timeout_secs) as ac:
r = await ac.post(
f"{self.base_url}/sessions", json=body, headers=self._headers
)
r.raise_for_status()
return r.json()
async def turn(
self,
session_id: str,
prompt: str,
*,
timeout_secs: int | None = None,
) -> dict[str, Any]:
body: dict[str, Any] = {"prompt": prompt}
if timeout_secs is not None:
body["timeout_secs"] = int(timeout_secs)
async with httpx.AsyncClient(timeout=self.timeout_secs) as ac:
r = await ac.post(
f"{self.base_url}/sessions/{session_id}/turn",
json=body,
headers=self._headers,
)
r.raise_for_status()
return r.json()
async def close_session(self, session_id: str) -> None:
# Idempotent server-side; we still swallow 404 to keep teardown
# noise-free if the session was already GC'd.
async with httpx.AsyncClient(timeout=30) as ac:
try:
r = await ac.delete(
f"{self.base_url}/sessions/{session_id}",
headers=self._headers,
)
if r.status_code not in (200, 204, 404, 410):
r.raise_for_status()
except httpx.HTTPError as e:
log.warning("clawdforge session %s close failed: %s", session_id, e)
def turn_text(payload: dict) -> str:
"""Extract concatenated 'text' events from a /sessions/turn response.
Matches the SDK's ``TurnResult.text`` semantics. Falls back to an
empty string when the events list is missing or contains no text
events.
"""
events = payload.get("events") or []
parts: list[str] = []
for ev in events:
if not isinstance(ev, dict):
continue
if ev.get("type") == "text":
content = ev.get("content")
if isinstance(content, str):
parts.append(content)
return "".join(parts)
# Match all fenced code blocks with their language tag. Greedy + DOTALL so
# the body can be multi-line. Captures (lang, body).
_FENCE_BLOCK_RE = re.compile(
r"```(\w*)\s*\n(.*?)```",
re.DOTALL,
)
# A unified diff starts with one of these patterns somewhere — used to
# detect "is this body a raw diff" without a fence.
_DIFF_PREFIX_RE = re.compile(
r"^(?:diff --git |--- |\+\+\+ |Index: |@@ )",
re.MULTILINE,
)
def _balanced_json_objects(text: str) -> Iterable[str]:
"""Yield each top-level balanced ``{...}`` JSON candidate in *text*.
Walks the string brace-by-brace tracking depth — handles arbitrary
nesting (the regex form was capped at depth 1 which broke on diffs
that contain struct literals etc.). Skips brace chars inside strings
so {"diff": "fn x() { 1 }"} doesn't break the depth counter.
"""
depth = 0
start = -1
in_str = False
escape = False
for i, ch in enumerate(text):
if escape:
escape = False
continue
if in_str:
if ch == "\\":
escape = True
elif ch == '"':
in_str = False
continue
if ch == '"':
in_str = True
continue
if ch == "{":
if depth == 0:
start = i
depth += 1
elif ch == "}":
depth -= 1
if depth == 0 and start >= 0:
yield text[start : i + 1]
start = -1
def extract_diff_json(text: str) -> dict[str, Any] | None:
"""Pull the patcher's expected ``{"diff", "explanation", "confidence"}``
payload out of an Opus / Sonnet / agent response.
The prompt asks for "JSON ONLY" but in practice models return any of:
1. **Bare JSON** — what the prompt asked for.
2. **Fenced JSON** — ```` ```json {…} ``` ````.
3. **Fenced diff + prose** — ``` ```diff …unified diff… ``` ``` plus
loose explanation text. No JSON wrapper.
4. **Bare unified diff** — leading lines like ``diff --git`` or
``--- a/foo`` with no fences and no JSON at all.
Strategy:
a. Try the whole string as JSON; accept if it has a ``diff`` field.
b. Walk all fenced blocks; if any is JSON-with-diff, accept.
c. Walk balanced ``{…}`` substrings (depth-aware, string-aware) and
accept the first one with a ``diff`` field.
d. Fall back to scanning fenced ``diff`` blocks; if found, build a
synthetic payload using the prose around the fence as the
explanation. Confidence defaults to ``"medium"``.
e. Final fallback: if the entire body looks like a raw unified diff,
wrap it the same way as (d).
Returns ``None`` only when none of the above produces a usable diff.
"""
cleaned = text.strip()
if not cleaned:
return None
# (a) Bare JSON
try:
obj = json.loads(cleaned)
if isinstance(obj, dict):
cand = _normalize_diff_payload(obj)
if cand.get("diff"):
return cand
except (ValueError, TypeError):
pass
# (b) Walk fenced blocks for JSON-with-diff
fenced = list(_FENCE_BLOCK_RE.finditer(cleaned))
diff_fenced: list[str] = []
for m in fenced:
lang = (m.group(1) or "").lower().strip()
body = m.group(2).strip()
if lang in ("json", "", "javascript", "js"):
try:
obj = json.loads(body)
if isinstance(obj, dict):
cand = _normalize_diff_payload(obj)
if cand.get("diff"):
return cand
except (ValueError, TypeError):
pass
if lang in ("diff", "patch", "unified-diff"):
diff_fenced.append(body)
# (c) Balanced JSON substring scan, depth + string aware
for chunk in _balanced_json_objects(cleaned):
try:
obj = json.loads(chunk)
except (ValueError, TypeError):
continue
if isinstance(obj, dict):
cand = _normalize_diff_payload(obj)
if cand.get("diff"):
return cand
# (d) Fenced diff block(s) — synthesize the payload
if diff_fenced:
diff_text = diff_fenced[0]
explanation = _strip_fenced_blocks(cleaned).strip()[:500]
return {
"diff": diff_text,
"explanation": explanation or "diff extracted from fenced ```diff block",
"confidence": "medium",
}
# (e) Bare unified diff at the top level
if _DIFF_PREFIX_RE.search(cleaned):
return {
"diff": cleaned,
"explanation": "raw unified diff (no JSON wrapper, no code fence)",
"confidence": "low",
}
return None
def _normalize_diff_payload(obj: dict[str, Any]) -> dict[str, Any]:
"""Coerce a found JSON object to the canonical patcher shape.
Models occasionally return the diff under alt keys (``patch``,
``content``) or with extra metadata. We normalize the keys we use and
leave the rest in place for diagnostics. ``confidence`` defaults to
``"medium"`` if unset.
"""
out = dict(obj)
if "diff" not in out:
for alt in ("patch", "content", "diff_text"):
if isinstance(out.get(alt), str):
out["diff"] = out[alt]
break
out.setdefault("explanation", "")
out.setdefault("confidence", "medium")
return out
def _strip_fenced_blocks(text: str) -> str:
"""Return *text* with all ```…``` fenced blocks removed.
Used to recover prose-around-code as the explanation field when we
extract a raw fenced diff (case d above).
"""
return _FENCE_BLOCK_RE.sub("", text)
# --- Gitea wire wrapper -----------------------------------------------------
class GiteaClient:
"""Tiny async httpx wrapper around Gitea's PR + repo API.
Just enough surface to:
- POST /repos/{owner}/{repo}/pulls → open a PR
- GET /repos/{owner}/{repo}/pulls/{n} → check open/closed state for
digest follow-up counting
"""
def __init__(self, base_url: str, token: str, *, timeout_secs: int = 30):
self.base_url = base_url.rstrip("/")
self.token = token
self.timeout_secs = timeout_secs
@property
def _headers(self) -> dict[str, str]:
return {
"Authorization": f"token {self.token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
@staticmethod
def parse_repo(git_url: str) -> tuple[str, str] | None:
"""Extract (owner, repo) from a Gitea http(s) URL.
Strips any embedded credentials (``http://user:pass@host/...``) and
a trailing ``.git`` suffix. Returns ``None`` if the URL doesn't
look like a Gitea-style ``/<owner>/<repo>`` path.
"""
try:
u = urlparse(git_url)
except Exception:
return None
path = u.path.strip("/")
if path.endswith(".git"):
path = path[:-4]
parts = path.split("/")
if len(parts) < 2:
return None
return parts[0], parts[1]
async def open_pr(
self,
*,
owner: str,
repo: str,
title: str,
body: str,
head: str,
base: str,
) -> dict[str, Any]:
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls"
payload = {
"title": title,
"body": body,
"head": head,
"base": base,
}
async with httpx.AsyncClient(timeout=self.timeout_secs) as ac:
r = await ac.post(url, json=payload, headers=self._headers)
r.raise_for_status()
return r.json()
async def get_pr_state(
self, *, owner: str, repo: str, number: int
) -> str | None:
"""Return ``"open" | "closed"``. ``None`` if the PR couldn't be
fetched (auth failure, network blip) — caller treats that as
"assume open" for the digest follow-up count.
"""
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{number}"
async with httpx.AsyncClient(timeout=self.timeout_secs) as ac:
try:
r = await ac.get(url, headers=self._headers)
if r.status_code == 404:
return None
r.raise_for_status()
payload = r.json()
state = payload.get("state")
if isinstance(state, str):
return state
return None
except httpx.HTTPError as e:
log.warning(
"gitea PR state fetch failed for %s/%s#%d: %s",
owner,
repo,
number,
e,
)
return None
# --- prompt building --------------------------------------------------------
_PROMPT_TEMPLATE = """\
You are a code-fixing assistant. A finding was reported by tool X.
FINDING:
kind: {kind}
severity: {severity}
code: {code}
message: {message}
file: {file}
line: {line}
SOURCE CONTEXT (file, ±20 lines around the finding):
```{language}
{source}
```
PROJECT CONTEXT:
git_url: {git_url}
branch: {branch}
subproject: {subproject}
Output a unified diff (git format-patch style) that fixes the finding.
Output JSON ONLY: {{"diff": "<full diff>", "explanation": "<one sentence>", "confidence": "high|medium|low"}}
No prose outside the JSON.
"""
def _build_prompt(
*,
finding: dict,
source_excerpt: str,
language: str,
git_url: str,
branch: str,
subproject: str,
) -> str:
return _PROMPT_TEMPLATE.format(
kind=finding.get("kind", ""),
severity=finding.get("severity", ""),
code=finding.get("code") or "(unknown)",
message=(finding.get("message") or "")[:400],
file=finding.get("file") or "(unknown)",
line=finding.get("line") or 0,
language=language or "",
source=source_excerpt,
git_url=git_url,
branch=branch,
subproject=subproject or ".",
)
def _read_source_context(repo_root: Path, file_rel: str, line: int, *, radius: int = 20) -> str:
"""Read ±radius lines around `line` of `file_rel` (1-indexed) from
repo_root. Returns an empty string if the file can't be read or the
line is out of range — patch loop continues with no context, the model
just gets less to chew on."""
try:
path = (repo_root / file_rel).resolve()
# Defensive: ensure path stays under repo_root.
if not str(path).startswith(str(repo_root.resolve())):
return ""
text = path.read_text(encoding="utf-8", errors="replace")
except (OSError, UnicodeError):
return ""
lines = text.splitlines()
if not lines:
return ""
n = max(1, int(line))
start = max(0, n - 1 - radius)
end = min(len(lines), n + radius)
out = []
for i in range(start, end):
prefix = ">>>" if (i + 1) == n else " "
out.append(f"{prefix} {i + 1}: {lines[i]}")
return "\n".join(out)
# --- the patcher itself ----------------------------------------------------
class Patcher:
"""Owns the autonomous patch lifecycle.
A single ``Patcher`` instance is constructed at server startup and
bound to:
- the same ``DB`` the runner writes to,
- the same ``WorkspaceManager`` that materializes per-job worktrees,
- a ``Runner`` reference (for the verify step — re-running a recipe
uses the runner's own primitives so we don't reimplement subprocess
lifecycle here),
- a ``PatcherConfig`` with clawdforge + Gitea creds.
The runner's hook (see ``server.py`` lifespan) calls
:meth:`maybe_draft_for_job`. Tests can call :meth:`maybe_draft`
directly with a finding_id for fine-grained assertions.
"""
def __init__(
self,
*,
db: DB,
workspace: WorkspaceManager,
config: PatcherConfig,
runner: Any | None = None,
clawdforge: ClawdforgeClient | None = None,
gitea: GiteaClient | None = None,
):
self.db = db
self.workspace = workspace
self.config = config
self.runner = runner
self.clawdforge = clawdforge or ClawdforgeClient(
base_url=config.clawdforge_base_url,
token=config.clawdforge_token,
)
self.gitea = gitea or GiteaClient(
base_url=config.gitea_base_url,
token=config.gitea_token,
)
# ---------- public API --------------------------------------------------
async def maybe_draft(
self, job_id: str, finding_id: int | None = None
) -> PatchAttempt | None:
"""Attempt one patch on `job_id`.
If `finding_id` is None, picks the highest-severity unresolved
finding from this job. Returns None if there's nothing actionable
on the job at all.
"""
job = await self.db.arun(self.db.get_job, job_id)
if job is None:
log.warning("patcher: job %s not found", job_id)
return None
if finding_id is None:
chosen = await self._pick_finding(job_id)
if chosen is None:
log.info("patcher: no actionable finding on job %s", job_id)
return None
finding_id = int(chosen["id"])
return await self._draft_one(job=job, finding_id=int(finding_id))
async def maybe_draft_for_job(self, job: dict) -> list[PatchAttempt]:
"""Iterate over actionable findings on a job and draft up to
max_attempts_per_finding patches each.
Called from the runner's post-job hook when
``project.notify.auto_patch=true``. Failures inside one finding's
loop don't stop the others — we want to try every actionable
finding on a noisy nightly run.
"""
attempts: list[PatchAttempt] = []
findings = await self.db.arun(self.db.list_findings, job["id"])
if not findings_were_actionable(findings):
return attempts
# Highest-severity-first ordering. Severity ranking matches what the
# parsers emit: critical > high > error > warn > info.
ranked = sorted(findings, key=_severity_rank, reverse=True)
for f in ranked:
if not _finding_is_fixable(f):
continue
attempt = await self._draft_one(job=job, finding_id=int(f["id"]))
if attempt is not None:
attempts.append(attempt)
return attempts
# ---------- core --------------------------------------------------------
async def _pick_finding(self, job_id: str) -> dict | None:
findings = await self.db.arun(self.db.list_findings, job_id)
ranked = sorted(findings, key=_severity_rank, reverse=True)
for f in ranked:
if _finding_is_fixable(f):
return f
return None
async def _draft_one(self, *, job: dict, finding_id: int) -> PatchAttempt | None:
"""Run the full draft → apply → verify → push → PR pipeline for one
finding. Persists a row in patch_attempts on every terminal state.
"""
finding = await self.db.arun(self.db.get_finding, finding_id)
if finding is None:
log.warning("patcher: finding %s not found", finding_id)
return None
prior = await self.db.arun(self.db.count_patch_attempts, finding_id)
attempt_number = prior + 1
if prior >= self.config.max_attempts_per_finding:
row_id = await self.db.arun(
self.db.insert_patch_attempt,
finding_id=finding_id,
job_id=job["id"],
project_name=job["project_name"],
attempt_number=attempt_number,
status="max_attempts_exceeded",
error=f"already had {prior} prior attempts (cap {self.config.max_attempts_per_finding})",
)
return PatchAttempt(
id=row_id,
finding_id=finding_id,
job_id=job["id"],
project_name=job["project_name"],
attempt_number=attempt_number,
status="max_attempts_exceeded",
error=f"already had {prior} prior attempts",
)
# Pull project + recipe context.
project = await self.db.arun(self.db.get_project, job["project_name"])
if project is None:
return None
snapshot = json.loads(job["recipe_snapshot_json"])
sub = _find_subproject(snapshot, job["subproject_path"])
language = (sub.get("language") if sub else "") or ""
# Build attempt scaffolding.
branch_name = (
f"{self.config.auto_patch_branch_prefix}{job['id']}-{finding_id}"
)
attempt = PatchAttempt(
finding_id=finding_id,
job_id=job["id"],
project_name=job["project_name"],
attempt_number=attempt_number,
status="failed",
branch_name=branch_name,
)
session_id: str | None = None
try:
session_payload = await self.clawdforge.create_session(
agent="claude",
model=self.config.model,
meta={
"crafting_table_job_id": job["id"],
"finding_id": finding_id,
"project_name": job["project_name"],
"subproject": job["subproject_path"],
},
)
session_id = session_payload.get("session_id")
attempt.session_id = session_id
# Materialize a worktree to read source context AND host the
# patch. We re-use WorkspaceManager.materialize() with a
# synthetic job_id keyed on attempt so the bare clone gets
# reused but the worktree is unique per attempt.
patch_job_id = f"patch-{job['id']}-{finding_id}-{attempt_number}"
paths = await self._materialize_worktree(
project=job["project_name"],
git_url=project["git_url"],
branch=job["branch"],
patch_job_id=patch_job_id,
)
try:
# 1. Build prompt with source context
source_excerpt = _read_source_context(
paths.worktree_dir / (sub.get("path") if sub else "."),
finding.get("file") or "",
finding.get("line") or 1,
)
prompt = _build_prompt(
finding=finding,
source_excerpt=source_excerpt,
language=language,
git_url=project["git_url"],
branch=job["branch"],
subproject=job["subproject_path"],
)
# 2. Send turn
turn_payload = await self.clawdforge.turn(session_id, prompt)
model_text = turn_text(turn_payload)
parsed = extract_diff_json(model_text)
if parsed is None or not isinstance(parsed.get("diff"), str):
attempt.status = "drafted"
attempt.error = "malformed_response"
return await self._persist(attempt)
diff_text = parsed["diff"]
attempt.diff_excerpt = "\n".join(diff_text.splitlines()[:30])
# 3. Apply
applied = self._apply_diff_to_worktree(paths.worktree_dir, diff_text)
if not applied:
attempt.status = "apply_failed"
attempt.error = "git apply rejected the diff"
return await self._persist(attempt)
# 4. Verify by re-running the same recipe.
verify_ok = await self._verify(
job=job,
snapshot=snapshot,
paths=paths,
finding=finding,
)
if not verify_ok:
attempt.status = "verify_failed"
attempt.error = (
"recipe still failed after patch (or new findings appeared)"
)
return await self._persist(attempt)
# 5. Commit + push to a branch on origin.
pushed_branch = self._commit_and_push(
worktree_dir=paths.worktree_dir,
branch_name=branch_name,
finding=finding,
explanation=parsed.get("explanation") or "auto-patch",
)
if not pushed_branch:
attempt.status = "verify_failed"
attempt.error = "git push failed"
return await self._persist(attempt)
attempt.status = "pushed"
# 6. Open Gitea PR.
pr_url = await self._open_pr(
project=project,
branch_name=branch_name,
base_branch=job["branch"],
finding=finding,
explanation=parsed.get("explanation") or "auto-patch",
confidence=parsed.get("confidence") or "medium",
diff_excerpt=attempt.diff_excerpt or "",
)
if pr_url:
attempt.pr_url = pr_url
attempt.status = "pr_opened"
return await self._persist(attempt)
finally:
# Always clean up worktree to avoid /workspace bloat.
try:
await self.workspace.cleanup(paths)
except Exception as e: # pragma: no cover - defensive
log.warning("patcher: worktree cleanup failed: %s", e)
except httpx.HTTPError as e:
attempt.status = "failed"
attempt.error = f"http: {e!s}"[:400]
return await self._persist(attempt)
except Exception as e:
log.exception("patcher: unexpected error on finding %s", finding_id)
attempt.status = "failed"
attempt.error = f"{type(e).__name__}: {e!s}"[:400]
return await self._persist(attempt)
finally:
if session_id is not None:
try:
await self.clawdforge.close_session(session_id)
except Exception as e: # pragma: no cover - defensive
log.warning("patcher: clawdforge close failed: %s", e)
# ---------- internals ---------------------------------------------------
async def _materialize_worktree(
self,
*,
project: str,
git_url: str,
branch: str,
patch_job_id: str,
):
"""Materialize a fresh worktree for the patch attempt. Uses a
scratch in-memory log buffer because the patch attempt is its own
thing — separate from the originating job's recipe log."""
from io import StringIO
log_fh = StringIO()
return await self.workspace.materialize(
project=project,
job_id=patch_job_id,
git_url=git_url,
branch=branch,
log_fh=log_fh,
)
def _apply_diff_to_worktree(self, worktree_dir: Path, diff_text: str) -> bool:
"""Run ``git apply --check`` then ``git apply`` against the diff.
Returns True on success. We use --whitespace=nowarn because
clippy / mypy / ruff suggested fixes occasionally have trailing
whitespace that would otherwise reject in strict mode.
"""
diff_path = worktree_dir / ".crafting-patch.diff"
try:
diff_path.write_text(diff_text, encoding="utf-8")
except OSError as e:
log.warning("patcher: could not write diff: %s", e)
return False
try:
check = subprocess.run(
["git", "apply", "--check", "--whitespace=nowarn", str(diff_path)],
cwd=str(worktree_dir),
capture_output=True,
text=True,
timeout=60,
)
if check.returncode != 0:
log.info("patcher: git apply --check failed: %s", check.stderr.strip())
return False
applied = subprocess.run(
["git", "apply", "--whitespace=nowarn", str(diff_path)],
cwd=str(worktree_dir),
capture_output=True,
text=True,
timeout=60,
)
return applied.returncode == 0
except (subprocess.TimeoutExpired, OSError) as e:
log.warning("patcher: git apply error: %s", e)
return False
finally:
try:
diff_path.unlink()
except OSError:
pass
async def _verify(
self,
*,
job: dict,
snapshot: dict,
paths,
finding: dict,
) -> bool:
"""Re-run the originating recipe against the patched worktree.
Strategy: invoke the runner's own subprocess primitive
(``_exec_recipe``) so we get the same pump / timeout / process
group semantics as the original job. Fall back to a plain
subprocess call if the runner is None (test contexts).
We ALSO check that the original kind/code finding is gone post-
patch — useful for lint where the recipe might still exit nonzero
for unrelated reasons.
"""
sub = _find_subproject(snapshot, job["subproject_path"])
if sub is None:
return False
recipe_kind = job["recipe"]
cmd = sub.get(recipe_kind)
if not cmd:
return False
sub_path = sub.get("path", ".")
cwd = paths.worktree_dir / sub_path
timeout = (
self.config.verify_timeout_secs
or int(sub.get("timeout_secs") or 1800)
)
from io import StringIO
log_fh = StringIO()
if self.runner is not None and hasattr(self.runner, "_exec_recipe"):
try:
exit_code, timed_out = await self.runner._exec_recipe(
cmd=cmd, cwd=str(cwd), log_fh=log_fh, timeout=timeout
)
except Exception as e:
log.warning("patcher: verify run failed via runner: %s", e)
return False
if timed_out or exit_code != 0:
# For lint findings, the original code/file/line being gone
# is a stronger signal than exit_code=0 — but if the recipe
# exits nonzero with our specific finding's code still in
# the output, that's a clear failure.
output = log_fh.getvalue()
fcode = (finding.get("code") or "").strip()
if fcode and fcode in output:
return False
# exit nonzero w/ finding's code GONE may still be a fail
# (other lints fired); be conservative and return False.
return False
return True
# Fallback path for test environments that hand us a stub runner.
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=str(cwd),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
try:
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return False
return proc.returncode == 0
def _commit_and_push(
self,
*,
worktree_dir: Path,
branch_name: str,
finding: dict,
explanation: str,
) -> bool:
"""Commit the worktree changes to a new branch and push to origin.
Author defaults to ``crafting-table <crafting-table@localhost>``,
overridable via ``CRAFTING_PATCHER_AUTHOR_NAME`` +
``CRAFTING_PATCHER_AUTHOR_EMAIL``. We pass through --no-gpg-sign
because crafting-table containers don't have signing keys; commit
messages reference the finding id so the PR review can navigate
back to the finding row in the API.
"""
import os
author_name = os.environ.get("CRAFTING_PATCHER_AUTHOR_NAME", "crafting-table")
author_email = os.environ.get("CRAFTING_PATCHER_AUTHOR_EMAIL", "crafting-table@localhost")
env = {
"GIT_AUTHOR_NAME": author_name,
"GIT_AUTHOR_EMAIL": author_email,
"GIT_COMMITTER_NAME": author_name,
"GIT_COMMITTER_EMAIL": author_email,
"PATH": "/usr/local/bin:/usr/bin:/bin",
}
msg = (
f"crafting-table auto-patch: {finding.get('code') or finding.get('kind')}\n"
f"\n"
f"finding #{finding.get('id')}: {(finding.get('message') or '').splitlines()[0][:120]}\n"
f"\n"
f"{explanation}\n"
)
try:
subprocess.run(
["git", "checkout", "-b", branch_name],
cwd=str(worktree_dir),
check=True,
capture_output=True,
timeout=30,
)
subprocess.run(
["git", "add", "-A"],
cwd=str(worktree_dir),
check=True,
capture_output=True,
timeout=30,
)
subprocess.run(
["git", "commit", "-m", msg, "--no-gpg-sign"],
cwd=str(worktree_dir),
env=env,
check=True,
capture_output=True,
timeout=30,
)
subprocess.run(
["git", "push", "origin", branch_name],
cwd=str(worktree_dir),
check=True,
capture_output=True,
timeout=120,
)
return True
except subprocess.CalledProcessError as e:
log.warning(
"patcher: git step failed: %s\nstdout=%s\nstderr=%s",
e,
(e.stdout or b"").decode("utf-8", "replace")[:400],
(e.stderr or b"").decode("utf-8", "replace")[:400],
)
return False
except (subprocess.TimeoutExpired, OSError) as e:
log.warning("patcher: git push timed out / errored: %s", e)
return False
async def _open_pr(
self,
*,
project: dict,
branch_name: str,
base_branch: str,
finding: dict,
explanation: str,
confidence: str,
diff_excerpt: str,
) -> str | None:
"""Open a Gitea PR for the pushed branch. Returns the html_url on
success, None on auth/network failure."""
owner_repo = GiteaClient.parse_repo(project["git_url"])
if owner_repo is None:
log.warning(
"patcher: could not parse owner/repo from %s", project["git_url"]
)
return None
owner, repo = owner_repo
title = f"[crafting-table] auto-patch {finding.get('code') or finding.get('kind') or ''}".strip()
body = (
f"Automated patch drafted by crafting-table for finding "
f"#{finding.get('id')} ({finding.get('kind')} / "
f"{finding.get('code') or 'no-code'}).\n\n"
f"**Severity**: {finding.get('severity')}\n"
f"**File**: `{finding.get('file') or 'unknown'}` "
f"line {finding.get('line') or '?'}\n"
f"**Message**: {finding.get('message') or ''}\n\n"
f"**Explanation**: {explanation}\n"
f"**Confidence**: {confidence}\n\n"
f"### Diff (first 30 lines)\n```diff\n{diff_excerpt}\n```\n\n"
f"_Verify recipe re-ran cleanly on the patched worktree before "
f"this PR was opened._"
)
try:
payload = await self.gitea.open_pr(
owner=owner,
repo=repo,
title=title,
body=body,
head=branch_name,
base=base_branch,
)
except httpx.HTTPError as e:
log.warning("patcher: gitea PR open failed: %s", e)
return None
url = payload.get("html_url") or payload.get("url")
return str(url) if url else None
async def _persist(self, attempt: PatchAttempt) -> PatchAttempt:
row_id = await self.db.arun(
self.db.insert_patch_attempt,
finding_id=attempt.finding_id,
job_id=attempt.job_id,
project_name=attempt.project_name,
attempt_number=attempt.attempt_number,
status=attempt.status,
branch_name=attempt.branch_name,
pr_url=attempt.pr_url,
diff_excerpt=attempt.diff_excerpt,
session_id=attempt.session_id,
error=attempt.error,
)
attempt.id = row_id
return attempt
# --- helpers ----------------------------------------------------------------
_SEVERITY_RANK = {
"critical": 5,
"high": 4,
"error": 3,
"warn": 2,
"warning": 2,
"medium": 2,
"info": 1,
"low": 1,
}
def _severity_rank(finding: dict) -> int:
return _SEVERITY_RANK.get((finding.get("severity") or "").lower(), 0)
def _finding_is_fixable(f: dict) -> bool:
if not isinstance(f, dict):
return False
kind = f.get("kind")
if kind == "lint":
return bool(f.get("file") and f.get("line"))
if kind == "cve":
return bool(f.get("suggested_fix") or f.get("code"))
return False
def _find_subproject(snapshot: dict, path: str) -> dict | None:
for s in snapshot.get("subprojects", []):
if s.get("path") == path:
return s
return None
__all__ = [
"PatcherConfig",
"Patcher",
"PatchAttempt",
"PatchStatus",
"ClawdforgeClient",
"GiteaClient",
"findings_were_actionable",
"extract_diff_json",
"turn_text",
]