"""Autonomous patch loop — wave 3 / step 9. Lifecycle, end-to-end: 1. A job finishes with one or more `actionable` findings (lint with file/line, cve with a known fix-version). The runner's post-job hook calls :meth:`Patcher.maybe_draft_for_job`. 2. For each candidate finding (highest severity first, capped at `max_attempts_per_finding`): a. Pull surrounding source from the project's bare-clone-backed worktree (`±20 lines` around `finding.line`). b. Open a clawdforge session via ``POST /sessions`` with `agent="claude"` and metadata identifying the job + finding. c. Send one turn with a structured prompt; expect a JSON object ``{"diff": ..., "explanation": ..., "confidence": ...}`` back. d. Apply the diff in a fresh worktree on a new branch ``crafting-table/auto/-``. Use ``git apply --check`` first; failure → status=``apply_failed``. e. Re-run the failing recipe on the patched worktree (the *verify* step). Failure → status=``verify_failed``. f. Commit + push the branch to origin. g. Open a Gitea PR (``POST /api/v1/repos///pulls``) with title ``[crafting-table] auto-patch ``. 3. Persist a row in ``patch_attempts`` regardless of which step failed — so the digest can surface "we tried; it didn't work" honestly. 4. Always close the clawdforge session in a ``finally``. **Verification cost**: re-running the recipe on the patched worktree is the only safety net. For a recipe with 20-minute build the verify step DOUBLES the latency. Recommend ``notify.auto_patch=true`` only on projects where the audit/test recipe is <5min, OR the operator accepts the latency. v0.2 candidate: a "fast verify" mode that re-runs only the specific lint that fired, not the whole recipe. Network calls go through a tiny inline ``httpx`` wrapper instead of the full clawdforge SDK — keeps the dep surface small and the wire shape obvious. """ from __future__ import annotations import asyncio import json import logging import re import shutil import subprocess import time from dataclasses import dataclass from pathlib import Path from typing import Any, Iterable, Literal from urllib.parse import urlparse import httpx from .db import DB from .workspace import WorkspaceManager log = logging.getLogger("crafting_table.patcher") PatchStatus = Literal[ "drafted", "apply_failed", "verify_failed", "pushed", "pr_opened", "max_attempts_exceeded", "failed", ] # Findings of these kinds are eligible for auto-patch in v0.1. test_fail is # NOT in here — too brittle for v0.1, lands in v0.2 with deeper context. _FIXABLE_KINDS = {"lint", "cve"} @dataclass(frozen=True) class PatcherConfig: """Configuration for the autonomous patch loop. All fields can be sourced from environment variables — see :meth:`from_env` for the canonical mapping. The fields without env backing (`max_attempts_per_finding`, `auto_patch_branch_prefix`) are knobs that practically never change at deploy time. """ clawdforge_base_url: str clawdforge_token: str gitea_base_url: str gitea_token: str max_attempts_per_finding: int = 3 auto_patch_branch_prefix: str = "crafting-table/auto/" # Bound the verify recipe so a runaway patched recipe doesn't tie the # patcher up forever. Falls back to the original subproject's # timeout_secs when None. verify_timeout_secs: int | None = None # HTTP timeout margin — clawdforge adds an internal margin, but we cap # transport-level too for hung connections. http_timeout_secs: int = 600 # Model passed to clawdforge on session create. Pinned to "opus" by # default — code-work prompts (read finding context, write a unified # diff that doesn't break the verify recipe) reward Opus's longer # context + deeper reasoning. Override via CRAFTING_PATCHER_MODEL env # if you want sonnet for cost reasons. model: str = "opus" @classmethod def from_env(cls, env: dict[str, str] | None = None) -> "PatcherConfig | None": """Return a config populated from CRAFTING_CLAWDFORGE_* / CRAFTING_GITEA_* env vars. Returns None if any required var is missing — caller treats that as "patcher disabled." The reason this is a classmethod (not a free fn) is so tests can construct a config directly without needing every env var, while production reads from the process environment. """ import os e = env if env is not None else dict(os.environ) cf_url = e.get("CRAFTING_CLAWDFORGE_URL", "").strip() cf_tok = e.get("CRAFTING_CLAWDFORGE_TOKEN", "").strip() gt_url = e.get("CRAFTING_GITEA_URL", "").strip() gt_tok = e.get("CRAFTING_GITEA_TOKEN", "").strip() if not (cf_url and cf_tok and gt_url and gt_tok): return None return cls( clawdforge_base_url=cf_url.rstrip("/"), clawdforge_token=cf_tok, gitea_base_url=gt_url.rstrip("/"), gitea_token=gt_tok, max_attempts_per_finding=int(e.get("CRAFTING_PATCHER_MAX_ATTEMPTS", "3")), auto_patch_branch_prefix=e.get( "CRAFTING_PATCHER_BRANCH_PREFIX", "crafting-table/auto/" ), model=e.get("CRAFTING_PATCHER_MODEL", "opus"), ) @dataclass class PatchAttempt: """Result of one patch-loop pass over a finding. Mirrors the columns in the ``patch_attempts`` table. ``status`` is the coarsest signal — ``pr_opened`` is a full success; everything else is some kind of failure with the diagnosis carried in ``error``. """ finding_id: int job_id: str project_name: str attempt_number: int status: PatchStatus branch_name: str | None = None pr_url: str | None = None diff_excerpt: str | None = None session_id: str | None = None error: str | None = None id: int | None = None # populated after DB persist def findings_were_actionable(findings: Iterable[dict]) -> bool: """Return True if at least one finding is fixable by the v0.1 loop. Rules: - kind ``lint`` requires a file + line (so we can extract context). - kind ``cve`` is fixable when a fix is at least suggested (we trust the parser's ``suggested_fix`` text — clippy etc. set it when they can; cargo-audit's ``fixed_in`` lands in suggested_fix via the Rust parser). - kind ``test_fail`` is NOT actionable in v0.1 (too brittle, no reliable single-line fix locator). """ for f in findings: if not isinstance(f, dict): continue kind = f.get("kind") if kind == "lint" and f.get("file") and f.get("line"): return True if kind == "cve" and (f.get("suggested_fix") or f.get("code")): return True return False # --- clawdforge wire wrapper ------------------------------------------------ class ClawdforgeClient: """Tiny async httpx wrapper around the clawdforge sessions API. We deliberately avoid the full clawdforge SDK because: - The SDK is sync (``requests``-based); we'd have to wrap every call in ``asyncio.to_thread`` anyway. - Pip-installing the SDK from a sibling LAN repo at runtime is brittle; this wrapper is ~50 lines, matches the wire shape exactly, and lives next to its consumer. Endpoints used: - ``POST /sessions`` → create - ``POST /sessions/{id}/turn`` → one turn - ``DELETE /sessions/{id}`` → close (idempotent) """ def __init__(self, base_url: str, token: str, *, timeout_secs: int = 600): self.base_url = base_url.rstrip("/") self.token = token self.timeout_secs = timeout_secs @property def _headers(self) -> dict[str, str]: return { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json", } async def create_session( self, *, agent: str = "claude", model: str | None = None, meta: dict[str, Any] | None = None, ) -> dict[str, Any]: body: dict[str, Any] = {"agent": agent} if model is not None: body["model"] = model if meta is not None: body["meta"] = meta async with httpx.AsyncClient(timeout=self.timeout_secs) as ac: r = await ac.post( f"{self.base_url}/sessions", json=body, headers=self._headers ) r.raise_for_status() return r.json() async def turn( self, session_id: str, prompt: str, *, timeout_secs: int | None = None, ) -> dict[str, Any]: body: dict[str, Any] = {"prompt": prompt} if timeout_secs is not None: body["timeout_secs"] = int(timeout_secs) async with httpx.AsyncClient(timeout=self.timeout_secs) as ac: r = await ac.post( f"{self.base_url}/sessions/{session_id}/turn", json=body, headers=self._headers, ) r.raise_for_status() return r.json() async def close_session(self, session_id: str) -> None: # Idempotent server-side; we still swallow 404 to keep teardown # noise-free if the session was already GC'd. async with httpx.AsyncClient(timeout=30) as ac: try: r = await ac.delete( f"{self.base_url}/sessions/{session_id}", headers=self._headers, ) if r.status_code not in (200, 204, 404, 410): r.raise_for_status() except httpx.HTTPError as e: log.warning("clawdforge session %s close failed: %s", session_id, e) def turn_text(payload: dict) -> str: """Extract concatenated 'text' events from a /sessions/turn response. Matches the SDK's ``TurnResult.text`` semantics. Falls back to an empty string when the events list is missing or contains no text events. """ events = payload.get("events") or [] parts: list[str] = [] for ev in events: if not isinstance(ev, dict): continue if ev.get("type") == "text": content = ev.get("content") if isinstance(content, str): parts.append(content) return "".join(parts) # Match all fenced code blocks with their language tag. Greedy + DOTALL so # the body can be multi-line. Captures (lang, body). _FENCE_BLOCK_RE = re.compile( r"```(\w*)\s*\n(.*?)```", re.DOTALL, ) # A unified diff starts with one of these patterns somewhere — used to # detect "is this body a raw diff" without a fence. _DIFF_PREFIX_RE = re.compile( r"^(?:diff --git |--- |\+\+\+ |Index: |@@ )", re.MULTILINE, ) def _balanced_json_objects(text: str) -> Iterable[str]: """Yield each top-level balanced ``{...}`` JSON candidate in *text*. Walks the string brace-by-brace tracking depth — handles arbitrary nesting (the regex form was capped at depth 1 which broke on diffs that contain struct literals etc.). Skips brace chars inside strings so {"diff": "fn x() { 1 }"} doesn't break the depth counter. """ depth = 0 start = -1 in_str = False escape = False for i, ch in enumerate(text): if escape: escape = False continue if in_str: if ch == "\\": escape = True elif ch == '"': in_str = False continue if ch == '"': in_str = True continue if ch == "{": if depth == 0: start = i depth += 1 elif ch == "}": depth -= 1 if depth == 0 and start >= 0: yield text[start : i + 1] start = -1 def extract_diff_json(text: str) -> dict[str, Any] | None: """Pull the patcher's expected ``{"diff", "explanation", "confidence"}`` payload out of an Opus / Sonnet / agent response. The prompt asks for "JSON ONLY" but in practice models return any of: 1. **Bare JSON** — what the prompt asked for. 2. **Fenced JSON** — ```` ```json {…} ``` ````. 3. **Fenced diff + prose** — ``` ```diff …unified diff… ``` ``` plus loose explanation text. No JSON wrapper. 4. **Bare unified diff** — leading lines like ``diff --git`` or ``--- a/foo`` with no fences and no JSON at all. Strategy: a. Try the whole string as JSON; accept if it has a ``diff`` field. b. Walk all fenced blocks; if any is JSON-with-diff, accept. c. Walk balanced ``{…}`` substrings (depth-aware, string-aware) and accept the first one with a ``diff`` field. d. Fall back to scanning fenced ``diff`` blocks; if found, build a synthetic payload using the prose around the fence as the explanation. Confidence defaults to ``"medium"``. e. Final fallback: if the entire body looks like a raw unified diff, wrap it the same way as (d). Returns ``None`` only when none of the above produces a usable diff. """ cleaned = text.strip() if not cleaned: return None # (a) Bare JSON try: obj = json.loads(cleaned) if isinstance(obj, dict): cand = _normalize_diff_payload(obj) if cand.get("diff"): return cand except (ValueError, TypeError): pass # (b) Walk fenced blocks for JSON-with-diff fenced = list(_FENCE_BLOCK_RE.finditer(cleaned)) diff_fenced: list[str] = [] for m in fenced: lang = (m.group(1) or "").lower().strip() body = m.group(2).strip() if lang in ("json", "", "javascript", "js"): try: obj = json.loads(body) if isinstance(obj, dict): cand = _normalize_diff_payload(obj) if cand.get("diff"): return cand except (ValueError, TypeError): pass if lang in ("diff", "patch", "unified-diff"): diff_fenced.append(body) # (c) Balanced JSON substring scan, depth + string aware for chunk in _balanced_json_objects(cleaned): try: obj = json.loads(chunk) except (ValueError, TypeError): continue if isinstance(obj, dict): cand = _normalize_diff_payload(obj) if cand.get("diff"): return cand # (d) Fenced diff block(s) — synthesize the payload if diff_fenced: diff_text = diff_fenced[0] explanation = _strip_fenced_blocks(cleaned).strip()[:500] return { "diff": diff_text, "explanation": explanation or "diff extracted from fenced ```diff block", "confidence": "medium", } # (e) Bare unified diff at the top level if _DIFF_PREFIX_RE.search(cleaned): return { "diff": cleaned, "explanation": "raw unified diff (no JSON wrapper, no code fence)", "confidence": "low", } return None def _normalize_diff_payload(obj: dict[str, Any]) -> dict[str, Any]: """Coerce a found JSON object to the canonical patcher shape. Models occasionally return the diff under alt keys (``patch``, ``content``) or with extra metadata. We normalize the keys we use and leave the rest in place for diagnostics. ``confidence`` defaults to ``"medium"`` if unset. """ out = dict(obj) if "diff" not in out: for alt in ("patch", "content", "diff_text"): if isinstance(out.get(alt), str): out["diff"] = out[alt] break out.setdefault("explanation", "") out.setdefault("confidence", "medium") return out def _strip_fenced_blocks(text: str) -> str: """Return *text* with all ```…``` fenced blocks removed. Used to recover prose-around-code as the explanation field when we extract a raw fenced diff (case d above). """ return _FENCE_BLOCK_RE.sub("", text) # --- Gitea wire wrapper ----------------------------------------------------- class GiteaClient: """Tiny async httpx wrapper around Gitea's PR + repo API. Just enough surface to: - POST /repos/{owner}/{repo}/pulls → open a PR - GET /repos/{owner}/{repo}/pulls/{n} → check open/closed state for digest follow-up counting """ def __init__(self, base_url: str, token: str, *, timeout_secs: int = 30): self.base_url = base_url.rstrip("/") self.token = token self.timeout_secs = timeout_secs @property def _headers(self) -> dict[str, str]: return { "Authorization": f"token {self.token}", "Content-Type": "application/json", "Accept": "application/json", } @staticmethod def parse_repo(git_url: str) -> tuple[str, str] | None: """Extract (owner, repo) from a Gitea http(s) URL. Strips any embedded credentials (``http://user:pass@host/...``) and a trailing ``.git`` suffix. Returns ``None`` if the URL doesn't look like a Gitea-style ``//`` path. """ try: u = urlparse(git_url) except Exception: return None path = u.path.strip("/") if path.endswith(".git"): path = path[:-4] parts = path.split("/") if len(parts) < 2: return None return parts[0], parts[1] async def open_pr( self, *, owner: str, repo: str, title: str, body: str, head: str, base: str, ) -> dict[str, Any]: url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls" payload = { "title": title, "body": body, "head": head, "base": base, } async with httpx.AsyncClient(timeout=self.timeout_secs) as ac: r = await ac.post(url, json=payload, headers=self._headers) r.raise_for_status() return r.json() async def get_pr_state( self, *, owner: str, repo: str, number: int ) -> str | None: """Return ``"open" | "closed"``. ``None`` if the PR couldn't be fetched (auth failure, network blip) — caller treats that as "assume open" for the digest follow-up count. """ url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{number}" async with httpx.AsyncClient(timeout=self.timeout_secs) as ac: try: r = await ac.get(url, headers=self._headers) if r.status_code == 404: return None r.raise_for_status() payload = r.json() state = payload.get("state") if isinstance(state, str): return state return None except httpx.HTTPError as e: log.warning( "gitea PR state fetch failed for %s/%s#%d: %s", owner, repo, number, e, ) return None # --- prompt building -------------------------------------------------------- _PROMPT_TEMPLATE = """\ You are a code-fixing assistant. A finding was reported by tool X. FINDING: kind: {kind} severity: {severity} code: {code} message: {message} file: {file} line: {line} SOURCE CONTEXT (file, ±20 lines around the finding): ```{language} {source} ``` PROJECT CONTEXT: git_url: {git_url} branch: {branch} subproject: {subproject} Output a unified diff (git format-patch style) that fixes the finding. Output JSON ONLY: {{"diff": "", "explanation": "", "confidence": "high|medium|low"}} No prose outside the JSON. """ def _build_prompt( *, finding: dict, source_excerpt: str, language: str, git_url: str, branch: str, subproject: str, ) -> str: return _PROMPT_TEMPLATE.format( kind=finding.get("kind", ""), severity=finding.get("severity", ""), code=finding.get("code") or "(unknown)", message=(finding.get("message") or "")[:400], file=finding.get("file") or "(unknown)", line=finding.get("line") or 0, language=language or "", source=source_excerpt, git_url=git_url, branch=branch, subproject=subproject or ".", ) def _read_source_context(repo_root: Path, file_rel: str, line: int, *, radius: int = 20) -> str: """Read ±radius lines around `line` of `file_rel` (1-indexed) from repo_root. Returns an empty string if the file can't be read or the line is out of range — patch loop continues with no context, the model just gets less to chew on.""" try: path = (repo_root / file_rel).resolve() # Defensive: ensure path stays under repo_root. if not str(path).startswith(str(repo_root.resolve())): return "" text = path.read_text(encoding="utf-8", errors="replace") except (OSError, UnicodeError): return "" lines = text.splitlines() if not lines: return "" n = max(1, int(line)) start = max(0, n - 1 - radius) end = min(len(lines), n + radius) out = [] for i in range(start, end): prefix = ">>>" if (i + 1) == n else " " out.append(f"{prefix} {i + 1}: {lines[i]}") return "\n".join(out) # --- the patcher itself ---------------------------------------------------- class Patcher: """Owns the autonomous patch lifecycle. A single ``Patcher`` instance is constructed at server startup and bound to: - the same ``DB`` the runner writes to, - the same ``WorkspaceManager`` that materializes per-job worktrees, - a ``Runner`` reference (for the verify step — re-running a recipe uses the runner's own primitives so we don't reimplement subprocess lifecycle here), - a ``PatcherConfig`` with clawdforge + Gitea creds. The runner's hook (see ``server.py`` lifespan) calls :meth:`maybe_draft_for_job`. Tests can call :meth:`maybe_draft` directly with a finding_id for fine-grained assertions. """ def __init__( self, *, db: DB, workspace: WorkspaceManager, config: PatcherConfig, runner: Any | None = None, clawdforge: ClawdforgeClient | None = None, gitea: GiteaClient | None = None, ): self.db = db self.workspace = workspace self.config = config self.runner = runner self.clawdforge = clawdforge or ClawdforgeClient( base_url=config.clawdforge_base_url, token=config.clawdforge_token, ) self.gitea = gitea or GiteaClient( base_url=config.gitea_base_url, token=config.gitea_token, ) # ---------- public API -------------------------------------------------- async def maybe_draft( self, job_id: str, finding_id: int | None = None ) -> PatchAttempt | None: """Attempt one patch on `job_id`. If `finding_id` is None, picks the highest-severity unresolved finding from this job. Returns None if there's nothing actionable on the job at all. """ job = await self.db.arun(self.db.get_job, job_id) if job is None: log.warning("patcher: job %s not found", job_id) return None if finding_id is None: chosen = await self._pick_finding(job_id) if chosen is None: log.info("patcher: no actionable finding on job %s", job_id) return None finding_id = int(chosen["id"]) return await self._draft_one(job=job, finding_id=int(finding_id)) async def maybe_draft_for_job(self, job: dict) -> list[PatchAttempt]: """Iterate over actionable findings on a job and draft up to max_attempts_per_finding patches each. Called from the runner's post-job hook when ``project.notify.auto_patch=true``. Failures inside one finding's loop don't stop the others — we want to try every actionable finding on a noisy nightly run. """ attempts: list[PatchAttempt] = [] findings = await self.db.arun(self.db.list_findings, job["id"]) if not findings_were_actionable(findings): return attempts # Highest-severity-first ordering. Severity ranking matches what the # parsers emit: critical > high > error > warn > info. ranked = sorted(findings, key=_severity_rank, reverse=True) for f in ranked: if not _finding_is_fixable(f): continue attempt = await self._draft_one(job=job, finding_id=int(f["id"])) if attempt is not None: attempts.append(attempt) return attempts # ---------- core -------------------------------------------------------- async def _pick_finding(self, job_id: str) -> dict | None: findings = await self.db.arun(self.db.list_findings, job_id) ranked = sorted(findings, key=_severity_rank, reverse=True) for f in ranked: if _finding_is_fixable(f): return f return None async def _draft_one(self, *, job: dict, finding_id: int) -> PatchAttempt | None: """Run the full draft → apply → verify → push → PR pipeline for one finding. Persists a row in patch_attempts on every terminal state. """ finding = await self.db.arun(self.db.get_finding, finding_id) if finding is None: log.warning("patcher: finding %s not found", finding_id) return None prior = await self.db.arun(self.db.count_patch_attempts, finding_id) attempt_number = prior + 1 if prior >= self.config.max_attempts_per_finding: row_id = await self.db.arun( self.db.insert_patch_attempt, finding_id=finding_id, job_id=job["id"], project_name=job["project_name"], attempt_number=attempt_number, status="max_attempts_exceeded", error=f"already had {prior} prior attempts (cap {self.config.max_attempts_per_finding})", ) return PatchAttempt( id=row_id, finding_id=finding_id, job_id=job["id"], project_name=job["project_name"], attempt_number=attempt_number, status="max_attempts_exceeded", error=f"already had {prior} prior attempts", ) # Pull project + recipe context. project = await self.db.arun(self.db.get_project, job["project_name"]) if project is None: return None snapshot = json.loads(job["recipe_snapshot_json"]) sub = _find_subproject(snapshot, job["subproject_path"]) language = (sub.get("language") if sub else "") or "" # Build attempt scaffolding. branch_name = ( f"{self.config.auto_patch_branch_prefix}{job['id']}-{finding_id}" ) attempt = PatchAttempt( finding_id=finding_id, job_id=job["id"], project_name=job["project_name"], attempt_number=attempt_number, status="failed", branch_name=branch_name, ) session_id: str | None = None try: session_payload = await self.clawdforge.create_session( agent="claude", model=self.config.model, meta={ "crafting_table_job_id": job["id"], "finding_id": finding_id, "project_name": job["project_name"], "subproject": job["subproject_path"], }, ) session_id = session_payload.get("session_id") attempt.session_id = session_id # Materialize a worktree to read source context AND host the # patch. We re-use WorkspaceManager.materialize() with a # synthetic job_id keyed on attempt so the bare clone gets # reused but the worktree is unique per attempt. patch_job_id = f"patch-{job['id']}-{finding_id}-{attempt_number}" paths = await self._materialize_worktree( project=job["project_name"], git_url=project["git_url"], branch=job["branch"], patch_job_id=patch_job_id, ) try: # 1. Build prompt with source context source_excerpt = _read_source_context( paths.worktree_dir / (sub.get("path") if sub else "."), finding.get("file") or "", finding.get("line") or 1, ) prompt = _build_prompt( finding=finding, source_excerpt=source_excerpt, language=language, git_url=project["git_url"], branch=job["branch"], subproject=job["subproject_path"], ) # 2. Send turn turn_payload = await self.clawdforge.turn(session_id, prompt) model_text = turn_text(turn_payload) parsed = extract_diff_json(model_text) if parsed is None or not isinstance(parsed.get("diff"), str): attempt.status = "drafted" attempt.error = "malformed_response" return await self._persist(attempt) diff_text = parsed["diff"] attempt.diff_excerpt = "\n".join(diff_text.splitlines()[:30]) # 3. Apply applied = self._apply_diff_to_worktree(paths.worktree_dir, diff_text) if not applied: attempt.status = "apply_failed" attempt.error = "git apply rejected the diff" return await self._persist(attempt) # 4. Verify by re-running the same recipe. verify_ok = await self._verify( job=job, snapshot=snapshot, paths=paths, finding=finding, ) if not verify_ok: attempt.status = "verify_failed" attempt.error = ( "recipe still failed after patch (or new findings appeared)" ) return await self._persist(attempt) # 5. Commit + push to a branch on origin. pushed_branch = self._commit_and_push( worktree_dir=paths.worktree_dir, branch_name=branch_name, finding=finding, explanation=parsed.get("explanation") or "auto-patch", ) if not pushed_branch: attempt.status = "verify_failed" attempt.error = "git push failed" return await self._persist(attempt) attempt.status = "pushed" # 6. Open Gitea PR. pr_url = await self._open_pr( project=project, branch_name=branch_name, base_branch=job["branch"], finding=finding, explanation=parsed.get("explanation") or "auto-patch", confidence=parsed.get("confidence") or "medium", diff_excerpt=attempt.diff_excerpt or "", ) if pr_url: attempt.pr_url = pr_url attempt.status = "pr_opened" return await self._persist(attempt) finally: # Always clean up worktree to avoid /workspace bloat. try: await self.workspace.cleanup(paths) except Exception as e: # pragma: no cover - defensive log.warning("patcher: worktree cleanup failed: %s", e) except httpx.HTTPError as e: attempt.status = "failed" attempt.error = f"http: {e!s}"[:400] return await self._persist(attempt) except Exception as e: log.exception("patcher: unexpected error on finding %s", finding_id) attempt.status = "failed" attempt.error = f"{type(e).__name__}: {e!s}"[:400] return await self._persist(attempt) finally: if session_id is not None: try: await self.clawdforge.close_session(session_id) except Exception as e: # pragma: no cover - defensive log.warning("patcher: clawdforge close failed: %s", e) # ---------- internals --------------------------------------------------- async def _materialize_worktree( self, *, project: str, git_url: str, branch: str, patch_job_id: str, ): """Materialize a fresh worktree for the patch attempt. Uses a scratch in-memory log buffer because the patch attempt is its own thing — separate from the originating job's recipe log.""" from io import StringIO log_fh = StringIO() return await self.workspace.materialize( project=project, job_id=patch_job_id, git_url=git_url, branch=branch, log_fh=log_fh, ) def _apply_diff_to_worktree(self, worktree_dir: Path, diff_text: str) -> bool: """Run ``git apply --check`` then ``git apply`` against the diff. Returns True on success. We use --whitespace=nowarn because clippy / mypy / ruff suggested fixes occasionally have trailing whitespace that would otherwise reject in strict mode. """ diff_path = worktree_dir / ".crafting-patch.diff" try: diff_path.write_text(diff_text, encoding="utf-8") except OSError as e: log.warning("patcher: could not write diff: %s", e) return False try: check = subprocess.run( ["git", "apply", "--check", "--whitespace=nowarn", str(diff_path)], cwd=str(worktree_dir), capture_output=True, text=True, timeout=60, ) if check.returncode != 0: log.info("patcher: git apply --check failed: %s", check.stderr.strip()) return False applied = subprocess.run( ["git", "apply", "--whitespace=nowarn", str(diff_path)], cwd=str(worktree_dir), capture_output=True, text=True, timeout=60, ) return applied.returncode == 0 except (subprocess.TimeoutExpired, OSError) as e: log.warning("patcher: git apply error: %s", e) return False finally: try: diff_path.unlink() except OSError: pass async def _verify( self, *, job: dict, snapshot: dict, paths, finding: dict, ) -> bool: """Re-run the originating recipe against the patched worktree. Strategy: invoke the runner's own subprocess primitive (``_exec_recipe``) so we get the same pump / timeout / process group semantics as the original job. Fall back to a plain subprocess call if the runner is None (test contexts). We ALSO check that the original kind/code finding is gone post- patch — useful for lint where the recipe might still exit nonzero for unrelated reasons. """ sub = _find_subproject(snapshot, job["subproject_path"]) if sub is None: return False recipe_kind = job["recipe"] cmd = sub.get(recipe_kind) if not cmd: return False sub_path = sub.get("path", ".") cwd = paths.worktree_dir / sub_path timeout = ( self.config.verify_timeout_secs or int(sub.get("timeout_secs") or 1800) ) from io import StringIO log_fh = StringIO() if self.runner is not None and hasattr(self.runner, "_exec_recipe"): try: exit_code, timed_out = await self.runner._exec_recipe( cmd=cmd, cwd=str(cwd), log_fh=log_fh, timeout=timeout ) except Exception as e: log.warning("patcher: verify run failed via runner: %s", e) return False if timed_out or exit_code != 0: # For lint findings, the original code/file/line being gone # is a stronger signal than exit_code=0 — but if the recipe # exits nonzero with our specific finding's code still in # the output, that's a clear failure. output = log_fh.getvalue() fcode = (finding.get("code") or "").strip() if fcode and fcode in output: return False # exit nonzero w/ finding's code GONE may still be a fail # (other lints fired); be conservative and return False. return False return True # Fallback path for test environments that hand us a stub runner. proc = await asyncio.create_subprocess_shell( cmd, cwd=str(cwd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) try: await asyncio.wait_for(proc.wait(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() return False return proc.returncode == 0 def _commit_and_push( self, *, worktree_dir: Path, branch_name: str, finding: dict, explanation: str, ) -> bool: """Commit the worktree changes to a new branch and push to origin. Author defaults to ``crafting-table ``, overridable via ``CRAFTING_PATCHER_AUTHOR_NAME`` + ``CRAFTING_PATCHER_AUTHOR_EMAIL``. We pass through --no-gpg-sign because crafting-table containers don't have signing keys; commit messages reference the finding id so the PR review can navigate back to the finding row in the API. """ import os author_name = os.environ.get("CRAFTING_PATCHER_AUTHOR_NAME", "crafting-table") author_email = os.environ.get("CRAFTING_PATCHER_AUTHOR_EMAIL", "crafting-table@localhost") env = { "GIT_AUTHOR_NAME": author_name, "GIT_AUTHOR_EMAIL": author_email, "GIT_COMMITTER_NAME": author_name, "GIT_COMMITTER_EMAIL": author_email, "PATH": "/usr/local/bin:/usr/bin:/bin", } msg = ( f"crafting-table auto-patch: {finding.get('code') or finding.get('kind')}\n" f"\n" f"finding #{finding.get('id')}: {(finding.get('message') or '').splitlines()[0][:120]}\n" f"\n" f"{explanation}\n" ) try: subprocess.run( ["git", "checkout", "-b", branch_name], cwd=str(worktree_dir), check=True, capture_output=True, timeout=30, ) subprocess.run( ["git", "add", "-A"], cwd=str(worktree_dir), check=True, capture_output=True, timeout=30, ) subprocess.run( ["git", "commit", "-m", msg, "--no-gpg-sign"], cwd=str(worktree_dir), env=env, check=True, capture_output=True, timeout=30, ) subprocess.run( ["git", "push", "origin", branch_name], cwd=str(worktree_dir), check=True, capture_output=True, timeout=120, ) return True except subprocess.CalledProcessError as e: log.warning( "patcher: git step failed: %s\nstdout=%s\nstderr=%s", e, (e.stdout or b"").decode("utf-8", "replace")[:400], (e.stderr or b"").decode("utf-8", "replace")[:400], ) return False except (subprocess.TimeoutExpired, OSError) as e: log.warning("patcher: git push timed out / errored: %s", e) return False async def _open_pr( self, *, project: dict, branch_name: str, base_branch: str, finding: dict, explanation: str, confidence: str, diff_excerpt: str, ) -> str | None: """Open a Gitea PR for the pushed branch. Returns the html_url on success, None on auth/network failure.""" owner_repo = GiteaClient.parse_repo(project["git_url"]) if owner_repo is None: log.warning( "patcher: could not parse owner/repo from %s", project["git_url"] ) return None owner, repo = owner_repo title = f"[crafting-table] auto-patch {finding.get('code') or finding.get('kind') or ''}".strip() body = ( f"Automated patch drafted by crafting-table for finding " f"#{finding.get('id')} ({finding.get('kind')} / " f"{finding.get('code') or 'no-code'}).\n\n" f"**Severity**: {finding.get('severity')}\n" f"**File**: `{finding.get('file') or 'unknown'}` " f"line {finding.get('line') or '?'}\n" f"**Message**: {finding.get('message') or ''}\n\n" f"**Explanation**: {explanation}\n" f"**Confidence**: {confidence}\n\n" f"### Diff (first 30 lines)\n```diff\n{diff_excerpt}\n```\n\n" f"_Verify recipe re-ran cleanly on the patched worktree before " f"this PR was opened._" ) try: payload = await self.gitea.open_pr( owner=owner, repo=repo, title=title, body=body, head=branch_name, base=base_branch, ) except httpx.HTTPError as e: log.warning("patcher: gitea PR open failed: %s", e) return None url = payload.get("html_url") or payload.get("url") return str(url) if url else None async def _persist(self, attempt: PatchAttempt) -> PatchAttempt: row_id = await self.db.arun( self.db.insert_patch_attempt, finding_id=attempt.finding_id, job_id=attempt.job_id, project_name=attempt.project_name, attempt_number=attempt.attempt_number, status=attempt.status, branch_name=attempt.branch_name, pr_url=attempt.pr_url, diff_excerpt=attempt.diff_excerpt, session_id=attempt.session_id, error=attempt.error, ) attempt.id = row_id return attempt # --- helpers ---------------------------------------------------------------- _SEVERITY_RANK = { "critical": 5, "high": 4, "error": 3, "warn": 2, "warning": 2, "medium": 2, "info": 1, "low": 1, } def _severity_rank(finding: dict) -> int: return _SEVERITY_RANK.get((finding.get("severity") or "").lower(), 0) def _finding_is_fixable(f: dict) -> bool: if not isinstance(f, dict): return False kind = f.get("kind") if kind == "lint": return bool(f.get("file") and f.get("line")) if kind == "cve": return bool(f.get("suggested_fix") or f.get("code")) return False def _find_subproject(snapshot: dict, path: str) -> dict | None: for s in snapshot.get("subprojects", []): if s.get("path") == path: return s return None __all__ = [ "PatcherConfig", "Patcher", "PatchAttempt", "PatchStatus", "ClawdforgeClient", "GiteaClient", "findings_were_actionable", "extract_diff_json", "turn_text", ]