v0.1 wave 2A (steps 5+6): per-language parsers + findings extraction

- parsers/ package: rust / python / go / typescript / generic
- parser registry with language+recipe -> fallback resolution
- fingerprint hash (kind+file+line+code) for cross-run dedup
- runner.py post-exec hook: parse log, persist findings, count on job row
  (extraction runs before mark_job_finished so callers polling on terminal
  status see findings_count populated atomically)
- db.insert_finding / list_findings / increment_findings_count DAOs already
  shipped in wave 1; wired here
- GET /jobs/{id}/findings now returns real data (server route already
  shipped; was returning empty list because nothing populated the table)
- tests/test_parsers/: 6 modules + 11 fixtures (rust/python/go/typescript)
- tests/test_runner_findings.py: 3 integration tests
- README: tick steps 2-6, add Findings section

Suite: 108 passing (62 wave-1 + 46 new).
Spec: memory/spec-crafting-table.md
This commit is contained in:
Kayos 2026-04-29 08:32:56 -07:00
parent 98306ca2e0
commit d467b2f5be
30 changed files with 1968 additions and 5 deletions

View file

@ -15,14 +15,14 @@ through clawdforge.
Spec: `Sulkta-Coop/openclaw-workspace/memory/spec-crafting-table.md` (LAN-only).
## Status — v0.1
## Status — v0.1 step 6 of 10
- [x] Step 1: Dockerfile + per-language smoke
- [x] Step 2: SQLite ledger + project registry
- [x] Step 3: HTTP API skeleton (FastAPI, port 8810)
- [x] Step 4: Job runner core (asyncio worker pool, git worktree, subprocess)
- [ ] Step 5: Per-language parsers (Rust / Python / Go / TS first)
- [ ] Step 6: Findings extraction + storage
- [x] Step 5: Per-language parsers (Rust / Python / Go / TS first)
- [x] Step 6: Findings extraction + storage
- [ ] Step 7: MCP server (stdio JSON-RPC, 8 tools)
- [x] Step 8: Email digest scheduler
- [ ] Step 9: Autonomous patch loop (clawdforge integration)
@ -70,7 +70,7 @@ override via `CRAFTING_LAN_CIDRS`.
| GET | `/jobs?project=&status=&limit=` | any | List own (or all if admin) |
| GET | `/jobs/{id}` | owner | State + last 200 log lines |
| GET | `/jobs/{id}/log` | owner | Full log (file stream) |
| GET | `/jobs/{id}/findings` | owner | Structured findings (wave 1: empty) |
| GET | `/jobs/{id}/findings` | owner | Structured findings (see Findings) |
Cross-token access returns **404, not 403** — same existence-leak guard as
clawdforge sessions.
@ -222,6 +222,65 @@ pip install -e '.[test]'
pytest tests/
```
## Findings
After every job, the runner reads the captured log and hands it to a
per-language parser. The parser turns native tool output (clippy JSON,
ruff JSON, govulncheck NDJSON, eslint JSON, tsc human errors, etc.) into
structured rows in the `findings` table.
### Parsers in v0.1
| Language | Recipes parsed | Tool output expected |
|-------------|------------------------------|------------------------------------------------------|
| rust | audit, lint, test, build | `cargo audit --json`, `cargo clippy --message-format=json`, `cargo test` (human) |
| python | audit, lint, test, build | `pip-audit -f json`, `ruff --output-format=json`, `mypy --output=json`, `pytest --tb=line` |
| go | audit, lint, build, test | `govulncheck -json`, `go vet -json` |
| typescript | lint, build, test, audit | `eslint -f json`, `tsc --noEmit` (stderr) |
| javascript | (alias of typescript) | same |
| _other_ | (any) | falls back to `GenericParser` — emits one `recipe_fail` row when exit_code != 0, else nothing |
Resolution order in the registry: exact `language` match (parsers
self-gate on recipe via `Parser.matches`), then `GenericParser`. Adding a
new language is a single file in `crafting_table/parsers/` plus an entry
in `PARSERS`.
### Finding kinds
- `lint` — clippy / ruff / mypy / eslint / tsc / go vet diagnostic.
- `cve` — vulnerability from cargo-audit / pip-audit / govulncheck. Carries
`code` = advisory id (RUSTSEC-..., GO-..., PYSEC-...) and a
`suggested_fix` of the form "bump <pkg> to <version>" when patched
versions are known.
- `test_fail` — failed test name extracted from `cargo test` /
`pytest` output.
- `recipe_fail` — fallback when no language-specific parser fired and the
recipe exited non-zero. `code` = `exit_<n>`, message names the recipe.
### Fingerprints + dedup
Every finding row carries a 16-char `fingerprint` hash over
`kind|file|line|code` (NOT the message — tool wording drifts). The same
lint reappearing across nightly runs produces the same fingerprint, so a
later wave can dedup digest output and surface only "new since last run."
### Consuming findings
```
GET /jobs/{job_id}/findings
→ {"ok": true, "findings": [
{"id": 1, "job_id": "...", "kind": "lint", "severity": "warn",
"file": "src/app.py", "line": 3, "code": "F401",
"message": "...", "suggested_fix": "...", "fingerprint": "...",
"raw_json": "{...}", "created_at": ...},
...
]}
```
Authorization is project-token-scoped (same model as `/jobs/{id}`). The
matching `job` row's `findings_count` mirrors the array length so
callers can decide whether to fetch the detail.
## Digest
Daily 06:00 PT email digest. One message per project per day; aggregates the

View file

@ -1,6 +1,7 @@
"""crafting-table — polyglot dev/build/audit container.
Wave 1 (steps 2+3+4): SQLite ledger + FastAPI skeleton + async job runner.
Wave 1 (steps 2+3+4): SQLite ledger + FastAPI skeleton + async job runner.
Wave 2A (steps 5+6): per-language parsers + findings extraction.
Spec: memory/spec-crafting-table.md
"""

View file

@ -0,0 +1,71 @@
"""Per-language parsers — turn tool output into structured Finding rows.
Wave 2A (steps 5+6 of the spec). The runner calls `find_parser(language,
recipe)` after a job's subprocess exits, hands the parser the captured log +
exit code + recipe, and gets back a list of Finding dataclasses to persist.
Resolution order in `find_parser`:
1. exact match `language:recipe` (e.g. ``rust:audit``)
2. language-only match (e.g. ``rust`` for any rust recipe)
3. `GenericParser` fallback emits one ``recipe_fail`` finding when the
recipe exited non-zero, otherwise empty.
Each parser is best-effort: if its expected JSON shape doesn't parse it
should fall back to "no findings, don't crash" rather than raising. The
runner wraps the call in a try/except as a belt-and-braces second line.
"""
from __future__ import annotations
from .base import Finding, Parser, fingerprint
from .generic import GenericParser
from .go import GoParser
from .python import PythonParser
from .rust import RustParser
from .typescript import TypeScriptParser
# Order matters only for the language-only fallback step inside
# find_parser — exact-match keys are fully qualified so collisions are
# impossible there.
PARSERS: dict[str, type[Parser]] = {
"rust": RustParser,
"python": PythonParser,
"go": GoParser,
"typescript": TypeScriptParser,
# JS uses the same eslint/tsc machinery as ts.
"javascript": TypeScriptParser,
}
def find_parser(language: str, recipe: str) -> type[Parser]:
"""Pick the most specific parser for (language, recipe).
The Parser protocol exposes a `matches` classmethod so a single parser
class can self-gate on (language, recipe) useful when one class
handles multiple recipes (e.g. RustParser handles audit / lint / test).
The resolution loop here is a thin wrapper around that.
"""
language = (language or "").strip().lower()
recipe = (recipe or "").strip().lower()
# Step 1+2: ask each registered parser if it claims this combo.
candidate = PARSERS.get(language)
if candidate is not None and candidate.matches(language, recipe):
return candidate
# Step 3: generic fallback. Always claims everything.
return GenericParser
__all__ = [
"Finding",
"Parser",
"fingerprint",
"find_parser",
"PARSERS",
"GenericParser",
"RustParser",
"PythonParser",
"GoParser",
"TypeScriptParser",
]

View file

@ -0,0 +1,110 @@
"""Parser protocol + Finding dataclass + fingerprint helper.
A Finding mirrors the columns in the `findings` table (see db.py migration
005). Parsers produce a list of these; the runner persists them with a
fingerprint computed via `fingerprint()` so the same lint reappearing across
nightly runs deduplicates cleanly.
The fingerprint deliberately excludes `message` because tool wording drifts
version-to-version (clippy especially loves to rephrase). The locator
kind+file+line+code is what makes "this is the same finding" stable.
"""
from __future__ import annotations
import hashlib
from dataclasses import dataclass, field
from typing import Protocol, runtime_checkable
@dataclass
class Finding:
"""Structured finding row. ``raw_json`` is an optional escape hatch — the
full original JSON object from the tool, serialized so that callers /
later analysis can re-extract fields we didn't break out into columns
(e.g. clippy's `spans[]` array, audit's full advisory body).
"""
kind: str # "lint" | "cve" | "test_fail" | "recipe_fail" | ...
severity: str # "info" | "warn" | "error" | "high" | "critical"
message: str
file: str | None = None
line: int | None = None
code: str | None = None
suggested_fix: str | None = None
raw_json: str | None = None
# Some parsers emit metadata fields (package/version/fixed_in/advisory)
# that aren't first-class DB columns — those go into raw_json so the
# info isn't lost. Keep `extras` here for parsers to stash structured
# bits before we serialize.
extras: dict = field(default_factory=dict)
@runtime_checkable
class Parser(Protocol):
"""Per-language parser. Implementations are stateless — every method is a
classmethod so we don't bother instantiating them; the registry holds
classes."""
@classmethod
def matches(cls, language: str, recipe: str) -> bool:
"""Does this parser claim (language, recipe)? Called by the registry
during resolution the parser owns the decision so we can register
multi-recipe parsers (e.g. RustParser handles all rust recipes)."""
...
@classmethod
def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]:
"""Turn raw subprocess output into Finding rows. Must not raise on
malformed input return [] and let the caller log the recipe as
un-parsed."""
...
def fingerprint(
kind: str,
file: str | None,
line: int | None,
code: str | None,
message: str,
) -> str:
"""Stable 16-char hash over the locator parts of a finding.
`message` is intentionally NOT in the hash tool wording drifts
version-to-version so two consecutive nightlies would otherwise produce
different fingerprints for the same underlying issue. The locator
(kind+file+line+code) is what makes "this is the same finding"
stable across runs.
The 16-char truncation gives 64 bits of collision space, more than
enough for one project's findings table.
"""
h = hashlib.sha256()
h.update(f"{kind}|{file or ''}|{line or 0}|{code or ''}".encode("utf-8"))
return h.hexdigest()[:16]
def _safe_json_loads(s: str):
"""Try json.loads(s); return None on failure. Several parsers wrap this
so they can degrade to [] without bringing down the runner."""
import json
try:
return json.loads(s)
except (ValueError, TypeError):
return None
def _iter_jsonl(text: str):
"""Yield parsed JSON objects from NDJSON/JSON-Lines text, skipping any
line that doesn't parse. cargo clippy, mypy --output=json, and
govulncheck all emit JSON-lines."""
import json
for line in text.splitlines():
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except ValueError:
continue

View file

@ -0,0 +1,34 @@
"""Fallback parser — used when no language-specific parser claims the recipe.
Behavior is intentionally minimal. We don't try to extract anything from
arbitrary stdout for unknown (language, recipe) combos that's a v0.2
problem. All we do is: if the recipe exited non-zero, emit a single
``recipe_fail`` finding so the digest can flag "something went wrong" without
hand-grepping the log. exit_code 0 = no findings.
"""
from __future__ import annotations
from .base import Finding
class GenericParser:
@classmethod
def matches(cls, language: str, recipe: str) -> bool:
# Always true — the registry uses this as the last-resort fallback.
return True
@classmethod
def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]:
if exit_code == 0:
return []
return [
Finding(
kind="recipe_fail",
severity="warn",
code=f"exit_{exit_code}",
message=(
f"recipe '{recipe}' exited with status {exit_code}; "
f"see job log for details"
),
)
]

View file

@ -0,0 +1,172 @@
"""Go parser — go vet / govulncheck.
Recipes:
- ``lint`` ``go vet -json`` writes a per-package JSON envelope to
stderr keyed by package name "tool name" list of diagnostics.
- ``audit`` ``govulncheck -json`` emits NDJSON; we filter for
``message.finding`` records that carry an OSV id and trace.
- ``test`` / ``build`` defer to recipe_fail-on-nonzero.
"""
from __future__ import annotations
import json
from .base import Finding, _iter_jsonl, _safe_json_loads
class GoParser:
@classmethod
def matches(cls, language: str, recipe: str) -> bool:
return language == "go" and recipe in {"audit", "lint", "test", "build"}
@classmethod
def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]:
if recipe == "lint":
return cls._parse_govet(raw_log)
if recipe == "audit":
return cls._parse_govulncheck(raw_log)
if exit_code != 0:
return [
Finding(
kind="recipe_fail",
severity="warn",
code=f"exit_{exit_code}",
message=f"go {recipe} exited with status {exit_code}",
)
]
return []
# ---- go vet ------------------------------------------------------------
@classmethod
def _parse_govet(cls, raw_log: str) -> list[Finding]:
"""``go vet -json`` envelope:
{"package/path": {"analyzer-name": [{posn, message, ...}, ...]}}
``posn`` looks like ``/abs/path/file.go:LINE:COL``. We pluck the
leading object out of the log (it's preceded by `go vet`'s usual
chatter) and walk it.
"""
envelope = _extract_json_object(raw_log)
if not isinstance(envelope, dict):
return []
out: list[Finding] = []
for pkg, analyzers in envelope.items():
if not isinstance(analyzers, dict):
continue
for analyzer, items in analyzers.items():
if not isinstance(items, list):
continue
for diag in items:
if not isinstance(diag, dict):
continue
posn = diag.get("posn") or ""
file, line = _parse_posn(posn)
out.append(
Finding(
kind="lint",
severity="warn",
file=file,
line=line,
code=analyzer,
message=diag.get("message") or "",
raw_json=json.dumps({"package": pkg, "analyzer": analyzer, "diag": diag}),
)
)
return out
# ---- govulncheck -------------------------------------------------------
@classmethod
def _parse_govulncheck(cls, raw_log: str) -> list[Finding]:
"""govulncheck -json emits NDJSON. The records of interest are
``{"finding": {...}}`` carrying an ``osv`` id; we also accept
``{"vulnerability": {...}}`` which older versions emit.
Dedup by OSV id since one vuln may be reported per call site.
"""
seen: set[str] = set()
out: list[Finding] = []
for obj in _iter_jsonl(raw_log):
if not isinstance(obj, dict):
continue
finding = obj.get("finding") or obj.get("vulnerability")
if not isinstance(finding, dict):
continue
osv = finding.get("osv") or finding.get("id") or finding.get("OSV")
if not osv or osv in seen:
continue
seen.add(osv)
mod = finding.get("module") or finding.get("Module") or "?"
fix = finding.get("fixed_version") or finding.get("FixedVersion")
summary = (
finding.get("summary")
or finding.get("Summary")
or finding.get("description")
or "vulnerability"
)
out.append(
Finding(
kind="cve",
severity="high",
code=osv,
message=f"{mod}: {summary}" + (f" — fixed in {fix}" if fix else ""),
suggested_fix=f"bump {mod} to {fix}" if fix else None,
raw_json=json.dumps(obj),
extras={"package": mod, "fixed_in": fix, "advisory": osv},
)
)
return out
def _parse_posn(posn: str) -> tuple[str | None, int | None]:
"""Parse ``/path/to/file.go:LINE:COL`` (or ``file.go:LINE``) into
(file, line). Returns (None, None) on garbage input."""
if not posn:
return None, None
# Walk from the right so the first colon hit is the column or line.
parts = posn.rsplit(":", 2)
if len(parts) == 3:
file, line_s, _col = parts
try:
return file, int(line_s)
except ValueError:
return file, None
if len(parts) == 2:
file, line_s = parts
try:
return file, int(line_s)
except ValueError:
return file, None
return posn, None
def _extract_json_object(text: str) -> dict | None:
"""Find the first balanced ``{...}`` block and json.loads it."""
start = text.find("{")
while start != -1:
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
c = text[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
candidate = text[start : i + 1]
parsed = _safe_json_loads(candidate)
if isinstance(parsed, dict):
return parsed
break
start = text.find("{", start + 1)
return None

View file

@ -0,0 +1,277 @@
"""Python parser — ruff / mypy / pip-audit / pytest.
Recipes:
- ``lint`` try ruff JSON first (a top-level array), fall back to mypy
JSON-lines if the array parse fails. A user's lint recipe can also be
``ruff check . && mypy ...`` and we handle both shapes interleaved in
the same log.
- ``audit`` pip-audit -f json. Top-level object with ``dependencies[]``
each carrying ``vulns[]``.
- ``test`` pytest. Parse ``FAILED tests/...::name - reason`` lines.
- ``build`` defer; pip install / setup.py output isn't a useful
structured channel.
"""
from __future__ import annotations
import json
import re
from .base import Finding, _iter_jsonl, _safe_json_loads
class PythonParser:
@classmethod
def matches(cls, language: str, recipe: str) -> bool:
return language == "python" and recipe in {"audit", "lint", "test", "build"}
@classmethod
def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]:
if recipe == "lint":
return cls._parse_lint(raw_log)
if recipe == "audit":
return cls._parse_pip_audit(raw_log)
if recipe == "test":
return cls._parse_pytest(raw_log, exit_code)
# build
if exit_code != 0:
return [
Finding(
kind="recipe_fail",
severity="warn",
code=f"exit_{exit_code}",
message=f"python build exited with status {exit_code}",
)
]
return []
# ---- lint --------------------------------------------------------------
@classmethod
def _parse_lint(cls, raw_log: str) -> list[Finding]:
"""ruff emits a JSON array; mypy --output=json emits JSON-lines.
Both are common in a lint recipe (often `ruff && mypy`). We parse
whichever shape applies, attempting both."""
out: list[Finding] = []
out.extend(cls._parse_ruff(raw_log))
out.extend(cls._parse_mypy(raw_log))
return out
@classmethod
def _parse_ruff(cls, raw_log: str) -> list[Finding]:
# Find the JSON array — it might be preceded by a banner / shell echo.
arr = _extract_json_array(raw_log)
if not isinstance(arr, list):
return []
out: list[Finding] = []
for item in arr:
if not isinstance(item, dict):
continue
# Ruff entries: {code, message, filename, location:{row,column}, ...}
# We also tolerate mypy-shaped entries here in case they leak in;
# mypy's _parse will skip them.
if "code" not in item or "message" not in item:
continue
loc = item.get("location") or {}
file = item.get("filename")
row = loc.get("row") if isinstance(loc, dict) else None
fix = item.get("fix") or {}
suggested = None
if isinstance(fix, dict):
suggested = fix.get("message") or fix.get("applicability")
out.append(
Finding(
kind="lint",
severity="warn",
file=file,
line=row,
code=item.get("code"),
message=item.get("message") or "",
suggested_fix=suggested,
raw_json=json.dumps(item),
)
)
return out
@classmethod
def _parse_mypy(cls, raw_log: str) -> list[Finding]:
out: list[Finding] = []
for obj in _iter_jsonl(raw_log):
if not isinstance(obj, dict):
continue
# mypy JSON-line: {"file":..., "line":..., "column":...,
# "severity":"error"|"note", "message":...,
# "code":...}
if "file" not in obj or "message" not in obj or "severity" not in obj:
continue
sev_in = obj.get("severity") or "warn"
sev = "error" if sev_in == "error" else "warn"
out.append(
Finding(
kind="lint",
severity=sev,
file=obj.get("file"),
line=obj.get("line"),
code=obj.get("code") or "mypy",
message=obj.get("message") or "",
raw_json=json.dumps(obj),
)
)
return out
# ---- pip-audit ---------------------------------------------------------
@classmethod
def _parse_pip_audit(cls, raw_log: str) -> list[Finding]:
"""pip-audit -f json shape:
{"dependencies":[{"name":..., "version":...,
"vulns":[{"id":..., "fix_versions":[...],
"description":...}]}]}
"""
envelope = _extract_json_object(raw_log)
if envelope is None:
return []
deps = envelope.get("dependencies") or []
out: list[Finding] = []
for dep in deps:
if not isinstance(dep, dict):
continue
pkg = dep.get("name") or "?"
ver = dep.get("version") or "?"
for vuln in dep.get("vulns") or []:
if not isinstance(vuln, dict):
continue
vid = vuln.get("id") or "PYSEC-?"
desc = vuln.get("description") or "vulnerability"
fixes = vuln.get("fix_versions") or []
fix_str = ", ".join(fixes) if fixes else "no fix available"
out.append(
Finding(
kind="cve",
severity="high",
code=vid,
message=f"{pkg} {ver}: {desc} — fixed in {fix_str}",
suggested_fix=(
f"bump {pkg} to {fixes[0]}" if fixes else None
),
raw_json=json.dumps({"dep": dep, "vuln": vuln}),
extras={
"package": pkg,
"version": ver,
"fixed_in": fixes,
"advisory": vid,
},
)
)
return out
# ---- pytest ------------------------------------------------------------
_PYTEST_FAILED_RE = re.compile(r"^FAILED\s+(\S+)\s*(?:-\s*(.+))?$")
@classmethod
def _parse_pytest(cls, raw_log: str, exit_code: int) -> list[Finding]:
if exit_code == 0:
return []
out: list[Finding] = []
seen: set[str] = set()
for line in raw_log.splitlines():
m = cls._PYTEST_FAILED_RE.match(line.strip())
if not m:
continue
name = m.group(1)
reason = (m.group(2) or "").strip()
if name in seen:
continue
seen.add(name)
# Split file::test_name to fill `file` column when possible.
file: str | None = None
if "::" in name:
file = name.split("::", 1)[0]
out.append(
Finding(
kind="test_fail",
severity="error",
file=file,
code=name,
message=f"pytest {name} failed" + (f": {reason}" if reason else ""),
)
)
if not out:
out.append(
Finding(
kind="test_fail",
severity="error",
code=f"exit_{exit_code}",
message=(
f"pytest exited {exit_code} but no FAILED lines "
f"detected; test process exited non-zero"
),
)
)
return out
def _extract_json_array(text: str) -> list | None:
"""Find the first balanced ``[...]`` block and json.loads it."""
start = text.find("[")
while start != -1:
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
c = text[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c == "[":
depth += 1
elif c == "]":
depth -= 1
if depth == 0:
candidate = text[start : i + 1]
parsed = _safe_json_loads(candidate)
if isinstance(parsed, list):
return parsed
break
start = text.find("[", start + 1)
return None
def _extract_json_object(text: str) -> dict | None:
"""Like _extract_json_array but for objects."""
start = text.find("{")
while start != -1:
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
c = text[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
candidate = text[start : i + 1]
parsed = _safe_json_loads(candidate)
if isinstance(parsed, dict):
return parsed
break
start = text.find("{", start + 1)
return None

View file

@ -0,0 +1,235 @@
"""Rust parser — clippy / cargo audit / cargo test.
Recipes handled:
- ``audit`` cargo audit --json envelope list of CVE findings
- ``lint`` cargo clippy --message-format=json (NDJSON) lint findings
- ``test`` cargo test human output (no good machine format) failures
- ``build`` falls through to the generic recipe_fail behavior because
build success/failure is captured by exit_code alone; structured build
errors come through clippy on the lint recipe.
Each branch degrades gracefully: malformed JSON empty findings, not
crash. The runner logs the parse failure and still records the job as
finished.
"""
from __future__ import annotations
import json
import re
from .base import Finding, _iter_jsonl, _safe_json_loads
class RustParser:
@classmethod
def matches(cls, language: str, recipe: str) -> bool:
return language == "rust" and recipe in {"audit", "lint", "test", "build"}
@classmethod
def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]:
if recipe == "audit":
return cls._parse_audit(raw_log)
if recipe == "lint":
return cls._parse_clippy(raw_log)
if recipe == "test":
return cls._parse_test(raw_log, exit_code)
# build: defer to generic-style behaviour. We don't try to parse
# cargo build output here; lint + clippy is the structured channel.
if exit_code != 0:
return [
Finding(
kind="recipe_fail",
severity="warn",
code=f"exit_{exit_code}",
message=f"cargo build exited with status {exit_code}",
)
]
return []
# ---- audit -------------------------------------------------------------
@classmethod
def _parse_audit(cls, raw_log: str) -> list[Finding]:
"""cargo-audit emits a single JSON envelope on stdout when invoked
with --json. Shape:
{"vulnerabilities": {"list": [{"package": {...}, "advisory": {...},
"versions": {"patched": [...]}}, ...]}}
We extract the JSON object substring (the recipe usually echoes
other text first) and pull each vulnerability out.
"""
envelope = _extract_json_object(raw_log)
if envelope is None:
return []
vulns = (envelope.get("vulnerabilities") or {}).get("list") or []
out: list[Finding] = []
for v in vulns:
pkg = (v.get("package") or {}).get("name") or "?"
ver = (v.get("package") or {}).get("version") or "?"
adv = v.get("advisory") or {}
adv_id = adv.get("id") or "RUSTSEC-?"
title = adv.get("title") or adv.get("description") or "advisory"
patched = (v.get("versions") or {}).get("patched") or []
patched_str = ", ".join(patched) if patched else "no fix available"
out.append(
Finding(
kind="cve",
severity="high",
code=adv_id,
message=f"{pkg} {ver}: {title} — patched in {patched_str}",
suggested_fix=(
f"bump {pkg} to {patched[0]}" if patched else None
),
raw_json=json.dumps(v),
extras={
"package": pkg,
"version": ver,
"fixed_in": patched,
"advisory": adv_id,
},
)
)
return out
# ---- clippy ------------------------------------------------------------
@classmethod
def _parse_clippy(cls, raw_log: str) -> list[Finding]:
"""cargo clippy --message-format=json emits NDJSON. Each line is a
cargo build-message; the ones we care about have:
reason == "compiler-message"
message.level in {"warning", "error"}
"""
out: list[Finding] = []
for obj in _iter_jsonl(raw_log):
if not isinstance(obj, dict):
continue
if obj.get("reason") != "compiler-message":
continue
msg = obj.get("message") or {}
level = msg.get("level")
if level not in {"warning", "error"}:
continue
code_obj = msg.get("code") or {}
code = code_obj.get("code") if isinstance(code_obj, dict) else None
spans = msg.get("spans") or []
primary = next(
(s for s in spans if s.get("is_primary")),
spans[0] if spans else None,
)
file = primary.get("file_name") if primary else None
line = primary.get("line_start") if primary else None
children = msg.get("children") or []
suggested = None
for ch in children:
rendered = ch.get("rendered")
if rendered:
suggested = rendered
break
severity = "error" if level == "error" else "warn"
out.append(
Finding(
kind="lint",
severity=severity,
file=file,
line=line,
code=code,
message=msg.get("message") or "",
suggested_fix=suggested,
raw_json=json.dumps(obj),
)
)
return out
# ---- test --------------------------------------------------------------
_TEST_FAIL_RE = re.compile(r"^\s*test\s+(\S+)\s+\.{3}\s+FAILED\s*$")
_FAILURES_RE = re.compile(r"^\s*failures:\s*$")
@classmethod
def _parse_test(cls, raw_log: str, exit_code: int) -> list[Finding]:
"""cargo test prints human-formatted output by default. Two reliable
signals:
1. ``test foo::bar ... FAILED`` lines from the runner.
2. ``failures:`` block listing the failed tests indented.
We collect the FAILED line names since they appear once per failure
and are the cleanest extraction. exit_code == 0 means no failures.
"""
if exit_code == 0:
return []
names: list[str] = []
for raw_line in raw_log.splitlines():
m = cls._TEST_FAIL_RE.match(raw_line)
if m:
names.append(m.group(1))
# Dedup while preserving order.
seen: set[str] = set()
unique: list[str] = []
for n in names:
if n in seen:
continue
seen.add(n)
unique.append(n)
if not unique:
return [
Finding(
kind="test_fail",
severity="error",
code=f"exit_{exit_code}",
message=(
f"cargo test exited {exit_code} but no FAILED lines "
f"detected; check log"
),
)
]
return [
Finding(
kind="test_fail",
severity="error",
code=name,
message=f"test {name} failed",
)
for name in unique
]
def _extract_json_object(text: str) -> dict | None:
"""Pull the first balanced ``{...}`` block out of ``text`` and json.loads it.
cargo-audit's --json output is a single object but the recipe shell
might echo a banner before/after. Scan for the first '{' and walk
braces (string-aware) to find its match. Falls back to None.
"""
start = text.find("{")
while start != -1:
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
c = text[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
candidate = text[start : i + 1]
parsed = _safe_json_loads(candidate)
if isinstance(parsed, dict):
return parsed
break
start = text.find("{", start + 1)
return None

View file

@ -0,0 +1,141 @@
"""TypeScript / JavaScript parser — eslint / tsc.
Recipes:
- ``lint`` eslint -f json (top-level array of file results) AND tsc
--noEmit (human-formatted lines like ``path.ts(L,C): error TS2304: ...``).
The user's lint command may run either or both.
- ``audit`` left to the generic recipe_fail behaviour for now;
npm audit's JSON shape is busy and changes versions a lot, parser deferred
to v0.2.
"""
from __future__ import annotations
import json
import re
from .base import Finding, _safe_json_loads
class TypeScriptParser:
@classmethod
def matches(cls, language: str, recipe: str) -> bool:
return language in {"typescript", "javascript"} and recipe in {
"audit",
"lint",
"test",
"build",
}
@classmethod
def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]:
if recipe == "lint":
out = cls._parse_eslint(raw_log)
out.extend(cls._parse_tsc(raw_log))
return out
if exit_code != 0:
return [
Finding(
kind="recipe_fail",
severity="warn",
code=f"exit_{exit_code}",
message=f"ts {recipe} exited with status {exit_code}",
)
]
return []
# ---- eslint ------------------------------------------------------------
@classmethod
def _parse_eslint(cls, raw_log: str) -> list[Finding]:
"""eslint -f json emits a top-level array: one entry per file with a
nested ``messages[]`` carrying ``{ruleId, severity, line, column,
message}``. severity 1=warn, 2=error.
"""
arr = _extract_json_array(raw_log)
if not isinstance(arr, list):
return []
out: list[Finding] = []
for file_result in arr:
if not isinstance(file_result, dict):
continue
file = file_result.get("filePath") or file_result.get("filename")
for msg in file_result.get("messages") or []:
if not isinstance(msg, dict):
continue
sev_int = msg.get("severity")
sev = "error" if sev_int == 2 else "warn"
out.append(
Finding(
kind="lint",
severity=sev,
file=file,
line=msg.get("line"),
code=msg.get("ruleId") or "eslint",
message=msg.get("message") or "",
raw_json=json.dumps({"file": file, "msg": msg}),
)
)
return out
# ---- tsc ---------------------------------------------------------------
_TSC_RE = re.compile(
r"^(?P<file>[^\s(][^()\n]*?)\((?P<line>\d+),(?P<col>\d+)\):\s+"
r"(?P<sev>error|warning)\s+(?P<code>TS\d+):\s+(?P<msg>.+)$"
)
@classmethod
def _parse_tsc(cls, raw_log: str) -> list[Finding]:
out: list[Finding] = []
for line in raw_log.splitlines():
m = cls._TSC_RE.match(line)
if not m:
continue
sev = "error" if m.group("sev") == "error" else "warn"
try:
lineno = int(m.group("line"))
except ValueError:
lineno = None
out.append(
Finding(
kind="lint",
severity=sev,
file=m.group("file"),
line=lineno,
code=m.group("code"),
message=m.group("msg"),
)
)
return out
def _extract_json_array(text: str) -> list | None:
start = text.find("[")
while start != -1:
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
c = text[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c == "[":
depth += 1
elif c == "]":
depth -= 1
if depth == 0:
candidate = text[start : i + 1]
parsed = _safe_json_loads(candidate)
if isinstance(parsed, list):
return parsed
break
start = text.find("[", start + 1)
return None

View file

@ -40,6 +40,7 @@ from pathlib import Path
from typing import Awaitable, Callable
from .db import DB
from .parsers import Finding, find_parser, fingerprint
from .workspace import WorkspaceManager, WorkspacePaths
@ -272,6 +273,23 @@ class Runner:
await self.workspace.cleanup(paths)
# Wave 2A — extract findings from the captured log BEFORE marking the
# job finished. Callers that poll on terminal status expect the
# findings_count and /findings rows to be populated by the time the
# job leaves the running state; doing the parse pass first keeps that
# invariant. Skipped on cancellation (no useful output) and on
# workspace failure (exit_code is a synthetic crafting-table sentinel,
# not a tool exit). Timed-out jobs ARE parsed: a timeout still
# produces real partial output, and recipe_fail-on-nonzero is useful.
findings_n = 0
if terminal_status not in {"cancelled"} and exit_code is not None and exit_code > -2:
findings_n = await self._extract_findings(
job_id=job_id,
ctx=ctx,
log_path=log_path,
exit_code=exit_code,
)
await self.db.arun(
self.db.mark_job_finished,
job_id=job_id,
@ -288,6 +306,7 @@ class Runner:
"status": terminal_status,
"exit_code": exit_code,
"log_path": str(log_path),
"findings_count": findings_n,
"finished_at": int(time.time()),
}
for hook in self._hooks:
@ -296,6 +315,67 @@ class Runner:
except Exception as e:
log.warning("jobs_finished hook failed: %s", e)
async def _extract_findings(
self,
*,
job_id: str,
ctx: _JobContext,
log_path: Path,
exit_code: int,
) -> int:
"""Parse the recipe's captured log into Finding rows and persist them.
Failure-tolerant: any exception in the parser path is logged and
swallowed bad parser output never marks a job failed. Returns the
number of findings persisted (may be zero).
"""
try:
language = (ctx.subproject.get("language") or "").lower()
recipe_kind = ctx.job["recipe"]
parser_cls = find_parser(language=language, recipe=recipe_kind)
log_text = self._read_log_safe(log_path)
findings: list[Finding] = parser_cls.parse(
log_text, exit_code, recipe_kind
)
count = 0
for f in findings:
fp = fingerprint(f.kind, f.file, f.line, f.code, f.message)
# raw_json: prefer the parser-supplied raw_json; if absent
# but extras has content, serialize that so callers don't
# lose the per-CVE / per-advisory metadata.
raw_json = f.raw_json
if raw_json is None and f.extras:
raw_json = json.dumps(f.extras)
await self.db.arun(
self.db.insert_finding,
job_id=job_id,
kind=f.kind,
severity=f.severity,
message=f.message,
fingerprint=fp,
file=f.file,
line=f.line,
code=f.code,
suggested_fix=f.suggested_fix,
raw_json=raw_json,
)
count += 1
if count:
await self.db.arun(self.db.increment_findings_count, job_id, count)
return count
except Exception as e:
log.warning("findings extraction failed for job %s: %s", job_id, e)
return 0
def _read_log_safe(self, log_path: Path) -> str:
"""Read the captured log; return '' on any I/O error so a missing log
file doesn't crash the parser pipeline."""
try:
return log_path.read_text(encoding="utf-8", errors="replace")
except OSError as e:
log.warning("failed to read log %s: %s", log_path, e)
return ""
async def _exec_recipe(
self, *, cmd: str, cwd: str, log_fh, timeout: int
) -> tuple[int, bool]:

View file

View file

@ -0,0 +1,22 @@
"""Local conftest for parser tests — exposes a fixtures-dir helper.
Parser tests don't need the server reload / TestClient machinery from the
top-level conftest; they only need to read fixture files. Keep them light.
"""
from __future__ import annotations
from pathlib import Path
import pytest
FIXTURES_DIR = Path(__file__).parent / "fixtures"
@pytest.fixture
def fixtures_dir() -> Path:
return FIXTURES_DIR
def load_fixture(*parts: str) -> str:
return (FIXTURES_DIR.joinpath(*parts)).read_text(encoding="utf-8")

View file

@ -0,0 +1,24 @@
go: downloading example.com/foo v1.2.3
{
"example.com/foo": {
"printf": [
{
"posn": "/work/foo/main.go:42:9",
"message": "Printf format %d has arg name of wrong type string"
},
{
"posn": "/work/foo/util.go:7:5",
"message": "Printf format %s has arg n of wrong type int"
}
],
"shadow": [
{
"posn": "/work/foo/main.go:55:13",
"message": "declaration of \"err\" shadows declaration at line 50"
}
]
},
"example.com/foo/sub": {
"shadow": []
}
}

View file

@ -0,0 +1,5 @@
{"config":{"protocol_version":"v1.0.0","scanner_name":"govulncheck"}}
{"progress":{"message":"Scanning your code…"}}
{"finding":{"osv":"GO-2023-1989","module":"golang.org/x/net","fixed_version":"v0.17.0","summary":"HTTP/2 rapid reset can cause excessive work in net/http"}}
{"finding":{"osv":"GO-2023-1989","module":"golang.org/x/net","fixed_version":"v0.17.0","summary":"HTTP/2 rapid reset can cause excessive work in net/http"}}
{"finding":{"osv":"GO-2024-2611","module":"google.golang.org/protobuf","fixed_version":"v1.33.0","summary":"Infinite loop in github.com/golang/protobuf"}}

View file

@ -0,0 +1,2 @@
{"file":"src/app.py","line":17,"column":4,"severity":"error","message":"Incompatible return value type (got \"str\", expected \"int\")","code":"return-value"}
{"file":"src/util.py","line":5,"column":1,"severity":"note","message":"Revealed type is \"builtins.int\"","code":null}

View file

@ -0,0 +1,20 @@
{
"dependencies": [
{
"name": "requests",
"version": "2.20.0",
"vulns": [
{
"id": "PYSEC-2018-28",
"fix_versions": ["2.20.1"],
"description": "Sensitive Authorization header sent on cross-origin redirect"
}
]
},
{
"name": "fastapi",
"version": "0.95.0",
"vulns": []
}
]
}

View file

@ -0,0 +1,21 @@
============================= test session starts ==============================
collected 5 items
tests/test_a.py::test_one PASSED [ 20%]
tests/test_a.py::test_two FAILED [ 40%]
tests/test_b.py::test_three PASSED [ 60%]
tests/test_b.py::test_four FAILED [ 80%]
tests/test_c.py::test_five PASSED [100%]
=================================== FAILURES ===================================
___________________________________ test_two ___________________________________
assert False
E AssertionError
__________________________________ test_four ___________________________________
assert 1 == 2
E AssertionError
=========================== short test summary info ============================
FAILED tests/test_a.py::test_two - AssertionError
FAILED tests/test_b.py::test_four - AssertionError: assert 1 == 2
========================= 2 failed, 3 passed in 0.12s ==========================

View file

@ -0,0 +1,16 @@
[
{
"code": "F401",
"message": "`os` imported but unused",
"filename": "/work/src/app.py",
"location": {"row": 3, "column": 1},
"end_location": {"row": 3, "column": 10},
"fix": {"applicability": "safe", "message": "Remove unused import"}
},
{
"code": "E501",
"message": "Line too long (102 > 88 characters)",
"filename": "/work/src/app.py",
"location": {"row": 42, "column": 89}
}
]

View file

@ -0,0 +1,2 @@
cargo audit fetched advisory database from https://github.com/RustSec/advisory-db
{"database":{"advisory_count":634},"lockfile":{"dependency_count":120},"settings":{},"vulnerabilities":{"found":true,"count":2,"list":[{"advisory":{"id":"RUSTSEC-2024-0123","title":"openssl: Use-after-free in SslContextBuilder","description":"Affected versions of this crate may use freed memory when…","date":"2024-08-12","aliases":["CVE-2024-12345"],"keywords":["use-after-free","openssl"],"categories":["memory-corruption"]},"versions":{"patched":[">=0.10.66"],"unaffected":[]},"affected":null,"package":{"name":"openssl","version":"0.10.55","source":"registry+https://github.com/rust-lang/crates.io-index","checksum":"abc123","dependencies":[]}},{"advisory":{"id":"RUSTSEC-2024-0099","title":"time: Out-of-bounds read in parse","description":"The time crate had an OOB read…","date":"2024-04-01","aliases":[],"keywords":["oob"],"categories":["denial-of-service"]},"versions":{"patched":[],"unaffected":[]},"affected":null,"package":{"name":"time","version":"0.2.27","source":"registry+https://github.com/rust-lang/crates.io-index","checksum":"def456","dependencies":[]}}]},"warnings":{}}

View file

@ -0,0 +1,5 @@
{"reason":"compiler-artifact","package_id":"foo 0.1.0","target":{"name":"foo"}}
{"reason":"compiler-message","package_id":"foo 0.1.0","target":{"name":"foo"},"message":{"rendered":"warning: unused variable: `x`\n --> src/lib.rs:12:9","level":"warning","code":{"code":"unused_variables","explanation":null},"message":"unused variable: `x`","spans":[{"file_name":"src/lib.rs","line_start":12,"line_end":12,"column_start":9,"column_end":10,"is_primary":true}],"children":[{"rendered":"help: if this is intentional, prefix it with an underscore: `_x`","level":"help","message":"if this is intentional, prefix it with an underscore","spans":[]}]}}
{"reason":"compiler-message","package_id":"foo 0.1.0","target":{"name":"foo"},"message":{"rendered":"error[E0382]: borrow of moved value: `s`","level":"error","code":{"code":"E0382","explanation":null},"message":"borrow of moved value: `s`","spans":[{"file_name":"src/main.rs","line_start":42,"line_end":42,"column_start":5,"column_end":10,"is_primary":true}],"children":[]}}
{"reason":"compiler-message","package_id":"foo 0.1.0","target":{"name":"foo"},"message":{"rendered":"note: ...","level":"note","code":null,"message":"note: nothing","spans":[],"children":[]}}
{"reason":"build-finished","success":true}

View file

@ -0,0 +1,23 @@
Finished test [unoptimized + debuginfo] target(s) in 0.45s
Running unittests src/lib.rs (target/debug/deps/foo-abc)
running 4 tests
test math::tests::adds_two ... ok
test math::tests::adds_negative ... FAILED
test parser::tests::parses_empty ... ok
test parser::tests::parses_garbage ... FAILED
failures:
---- math::tests::adds_negative stdout ----
thread 'math::tests::adds_negative' panicked at 'assertion failed', src/math.rs:14:5
---- parser::tests::parses_garbage stdout ----
thread 'parser::tests::parses_garbage' panicked at 'expected Err', src/parser.rs:33:5
failures:
math::tests::adds_negative
parser::tests::parses_garbage
test result: FAILED. 2 passed; 2 failed; 0 ignored; 0 measured; 0 filtered out

View file

@ -0,0 +1,30 @@
[
{
"filePath": "/work/src/index.ts",
"messages": [
{
"ruleId": "no-unused-vars",
"severity": 1,
"message": "'foo' is defined but never used.",
"line": 5,
"column": 7,
"nodeType": "Identifier"
},
{
"ruleId": "@typescript-eslint/no-explicit-any",
"severity": 2,
"message": "Unexpected any. Specify a different type.",
"line": 12,
"column": 18
}
],
"errorCount": 1,
"warningCount": 1
},
{
"filePath": "/work/src/util.ts",
"messages": [],
"errorCount": 0,
"warningCount": 0
}
]

View file

@ -0,0 +1,5 @@
src/index.ts(5,7): error TS2304: Cannot find name 'foo'.
src/index.ts(12,18): error TS7006: Parameter 'x' implicitly has an 'any' type.
src/util.ts(3,1): warning TS6133: 'unused' is declared but its value is never read.
Found 2 errors and 1 warning in 2 files.

View file

@ -0,0 +1,64 @@
"""Fingerprint helper unit tests.
Properties asserted:
- determinism (same args same hash)
- locator-only (changing message DOESN'T change fingerprint)
- file/line/code each contribute (changing any of them DOES change it)
- 16-char output, hex
"""
from __future__ import annotations
import re
from crafting_table.parsers.base import fingerprint
def test_fingerprint_deterministic():
a = fingerprint("lint", "src/x.py", 10, "F401", "unused import os")
b = fingerprint("lint", "src/x.py", 10, "F401", "unused import os")
assert a == b
def test_fingerprint_message_excluded():
"""Tool wording drifts; message must NOT contribute to the hash."""
a = fingerprint("lint", "src/x.py", 10, "F401", "unused import os")
b = fingerprint("lint", "src/x.py", 10, "F401", "wholly different wording")
assert a == b
def test_fingerprint_file_changes_hash():
a = fingerprint("lint", "src/x.py", 10, "F401", "msg")
b = fingerprint("lint", "src/y.py", 10, "F401", "msg")
assert a != b
def test_fingerprint_line_changes_hash():
a = fingerprint("lint", "src/x.py", 10, "F401", "msg")
b = fingerprint("lint", "src/x.py", 11, "F401", "msg")
assert a != b
def test_fingerprint_code_changes_hash():
a = fingerprint("lint", "src/x.py", 10, "F401", "msg")
b = fingerprint("lint", "src/x.py", 10, "E501", "msg")
assert a != b
def test_fingerprint_kind_changes_hash():
a = fingerprint("lint", "src/x.py", 10, "F401", "msg")
b = fingerprint("cve", "src/x.py", 10, "F401", "msg")
assert a != b
def test_fingerprint_handles_none_locator_parts():
# Findings without file/line (e.g. CVEs) still get a deterministic hash.
a = fingerprint("cve", None, None, "RUSTSEC-2024-0123", "openssl bad")
b = fingerprint("cve", None, None, "RUSTSEC-2024-0123", "openssl bad")
assert a == b
assert a != fingerprint("cve", None, None, "RUSTSEC-2024-0124", "openssl bad")
def test_fingerprint_shape():
fp = fingerprint("lint", "x", 1, "C", "m")
assert len(fp) == 16
assert re.fullmatch(r"[0-9a-f]{16}", fp)

View file

@ -0,0 +1,36 @@
"""GenericParser fallback unit tests."""
from __future__ import annotations
from crafting_table.parsers.generic import GenericParser
from crafting_table.parsers import find_parser
def test_generic_zero_exit_no_findings():
out = GenericParser.parse("any output", 0, "build")
assert out == []
def test_generic_nonzero_exit_emits_one_finding():
out = GenericParser.parse("oops", 1, "build")
assert len(out) == 1
f = out[0]
assert f.kind == "recipe_fail"
assert f.severity == "warn"
assert f.code == "exit_1"
assert "build" in f.message
assert "1" in f.message
def test_generic_matches_anything():
assert GenericParser.matches("anylang", "anyrecipe") is True
def test_registry_falls_back_to_generic_for_unknown_lang():
cls = find_parser("ruby", "audit")
assert cls is GenericParser
def test_registry_falls_back_to_generic_for_unknown_recipe():
# Rust parser declines unknown recipes; resolver should drop to generic.
cls = find_parser("rust", "deploy")
assert cls is GenericParser

View file

@ -0,0 +1,57 @@
"""GoParser unit tests — go vet + govulncheck."""
from __future__ import annotations
from .conftest import load_fixture
from crafting_table.parsers.go import GoParser, _parse_posn
def test_go_vet_extracts_diagnostics():
raw = load_fixture("go", "go_vet.json")
findings = GoParser.parse(raw, exit_code=1, recipe="lint")
# 2 printf + 1 shadow = 3.
assert len(findings) == 3
by_code: dict[str, int] = {}
for f in findings:
assert f.kind == "lint"
by_code[f.code] = by_code.get(f.code, 0) + 1
assert by_code["printf"] == 2
assert by_code["shadow"] == 1
printf_first = next(f for f in findings if f.code == "printf")
assert printf_first.file == "/work/foo/main.go"
assert printf_first.line == 42
def test_go_vet_garbage_no_findings():
findings = GoParser.parse("nothing useful here", exit_code=0, recipe="lint")
assert findings == []
def test_govulncheck_dedups_by_osv_id():
raw = load_fixture("go", "govulncheck.jsonl")
findings = GoParser.parse(raw, exit_code=3, recipe="audit")
# 3 finding records → 2 unique OSV ids.
assert len(findings) == 2
ids = sorted(f.code for f in findings)
assert ids == ["GO-2023-1989", "GO-2024-2611"]
f0 = next(f for f in findings if f.code == "GO-2023-1989")
assert f0.kind == "cve"
assert f0.suggested_fix and "v0.17.0" in f0.suggested_fix
assert f0.extras["package"] == "golang.org/x/net"
def test_go_audit_clean_log_no_findings():
findings = GoParser.parse('{"config":{"protocol_version":"v1.0.0"}}', exit_code=0, recipe="audit")
assert findings == []
def test_go_build_recipe_falls_through():
f = GoParser.parse("any", exit_code=1, recipe="build")
assert len(f) == 1 and f[0].kind == "recipe_fail"
def test_parse_posn_helper():
assert _parse_posn("/abs/x.go:42:9") == ("/abs/x.go", 42)
assert _parse_posn("rel/y.go:7") == ("rel/y.go", 7)
assert _parse_posn("") == (None, None)

View file

@ -0,0 +1,91 @@
"""PythonParser unit tests — ruff + mypy + pip-audit + pytest."""
from __future__ import annotations
from .conftest import load_fixture
from crafting_table.parsers.python import PythonParser
def test_python_lint_ruff_array():
raw = load_fixture("python", "ruff.json")
findings = PythonParser.parse(raw, exit_code=1, recipe="lint")
# 2 ruff entries → 2 findings.
assert len(findings) == 2
by_code = {f.code: f for f in findings}
assert "F401" in by_code and "E501" in by_code
f401 = by_code["F401"]
assert f401.kind == "lint"
assert f401.severity == "warn"
assert f401.file == "/work/src/app.py"
assert f401.line == 3
# ruff fix.message should map into suggested_fix
assert f401.suggested_fix is not None
def test_python_lint_mypy_jsonl():
raw = load_fixture("python", "mypy.jsonl")
findings = PythonParser.parse(raw, exit_code=1, recipe="lint")
# 2 mypy lines: 1 error (kept), 1 note (still parsed but warn).
assert len(findings) == 2
err = next(f for f in findings if f.severity == "error")
assert err.code == "return-value"
assert err.file == "src/app.py"
assert err.line == 17
def test_python_lint_handles_garbage():
findings = PythonParser.parse("oops not json", exit_code=1, recipe="lint")
assert findings == []
def test_python_audit_pip_audit():
raw = load_fixture("python", "pip_audit.json")
findings = PythonParser.parse(raw, exit_code=1, recipe="audit")
# Only requests has a vuln; fastapi has none.
assert len(findings) == 1
f = findings[0]
assert f.kind == "cve"
assert f.severity == "high"
assert f.code == "PYSEC-2018-28"
assert "requests" in f.message
assert f.suggested_fix == "bump requests to 2.20.1"
assert f.extras["package"] == "requests"
def test_python_audit_clean_log_no_findings():
raw = '{"dependencies":[]}'
findings = PythonParser.parse(raw, exit_code=0, recipe="audit")
assert findings == []
def test_python_test_parses_failed_lines():
raw = load_fixture("python", "pytest.txt")
findings = PythonParser.parse(raw, exit_code=1, recipe="test")
assert len(findings) == 2
codes = sorted(f.code for f in findings)
assert codes == sorted(["tests/test_a.py::test_two", "tests/test_b.py::test_four"])
for f in findings:
assert f.kind == "test_fail"
assert f.severity == "error"
assert f.file is not None
assert f.file.endswith(".py")
def test_python_test_zero_exit_no_findings():
findings = PythonParser.parse("all passed", exit_code=0, recipe="test")
assert findings == []
def test_python_test_nonzero_no_failed_marker_emits_synthetic():
findings = PythonParser.parse("collection error", exit_code=2, recipe="test")
assert len(findings) == 1
assert findings[0].kind == "test_fail"
assert "exit_2" in findings[0].code
def test_python_matches():
assert PythonParser.matches("python", "lint")
assert PythonParser.matches("python", "audit")
assert PythonParser.matches("python", "test")
assert not PythonParser.matches("rust", "lint")

View file

@ -0,0 +1,108 @@
"""RustParser unit tests — driven from fixtures/rust/ samples."""
from __future__ import annotations
from .conftest import load_fixture
from crafting_table.parsers.rust import RustParser
def test_rust_audit_extracts_two_cves():
raw = load_fixture("rust", "cargo_audit.json")
findings = RustParser.parse(raw, exit_code=1, recipe="audit")
assert len(findings) == 2
f1 = findings[0]
assert f1.kind == "cve"
assert f1.code == "RUSTSEC-2024-0123"
assert f1.severity == "high"
assert "openssl" in f1.message
assert "0.10.55" in f1.message
assert "0.10.66" in f1.message
assert f1.suggested_fix is not None
assert "0.10.66" in f1.suggested_fix
assert f1.raw_json is not None
assert f1.extras["package"] == "openssl"
f2 = findings[1]
assert f2.code == "RUSTSEC-2024-0099"
# No patched versions → no suggested_fix
assert f2.suggested_fix is None
def test_rust_audit_clean_log_no_findings():
# No vulnerabilities in the envelope.
raw = '{"vulnerabilities":{"found":false,"count":0,"list":[]}}'
findings = RustParser.parse(raw, exit_code=0, recipe="audit")
assert findings == []
def test_rust_audit_garbage_log_no_findings():
findings = RustParser.parse("not json at all", exit_code=1, recipe="audit")
assert findings == []
def test_rust_clippy_extracts_warning_and_error():
raw = load_fixture("rust", "cargo_clippy.jsonl")
findings = RustParser.parse(raw, exit_code=1, recipe="lint")
# Two compiler-message lines with level in {warning, error}; the "note"
# one should be filtered out.
assert len(findings) == 2
by_code = {f.code: f for f in findings}
assert "unused_variables" in by_code
assert "E0382" in by_code
w = by_code["unused_variables"]
assert w.severity == "warn"
assert w.kind == "lint"
assert w.file == "src/lib.rs"
assert w.line == 12
assert w.suggested_fix is not None
assert "_x" in w.suggested_fix
e = by_code["E0382"]
assert e.severity == "error"
assert e.line == 42
assert e.file == "src/main.rs"
def test_rust_clippy_skips_non_compiler_message_lines():
raw = '{"reason":"build-finished","success":true}\n'
findings = RustParser.parse(raw, exit_code=0, recipe="lint")
assert findings == []
def test_rust_test_parses_failures():
raw = load_fixture("rust", "cargo_test.txt")
findings = RustParser.parse(raw, exit_code=101, recipe="test")
codes = sorted(f.code for f in findings)
assert codes == sorted(["math::tests::adds_negative", "parser::tests::parses_garbage"])
for f in findings:
assert f.kind == "test_fail"
assert f.severity == "error"
def test_rust_test_zero_exit_no_findings():
findings = RustParser.parse("test result: ok. all passed", exit_code=0, recipe="test")
assert findings == []
def test_rust_test_nonzero_no_failed_lines_emits_synthetic():
findings = RustParser.parse("compile error", exit_code=2, recipe="test")
assert len(findings) == 1
assert findings[0].kind == "test_fail"
assert "exit_2" in findings[0].code
def test_rust_build_recipe_falls_through_to_recipe_fail():
findings = RustParser.parse("anything", exit_code=1, recipe="build")
assert len(findings) == 1
assert findings[0].kind == "recipe_fail"
def test_rust_matches_only_rust_recipes():
assert RustParser.matches("rust", "audit")
assert RustParser.matches("rust", "lint")
assert RustParser.matches("rust", "test")
assert RustParser.matches("rust", "build")
assert not RustParser.matches("python", "audit")
assert not RustParser.matches("rust", "deploy")

View file

@ -0,0 +1,56 @@
"""TypeScriptParser unit tests — eslint + tsc."""
from __future__ import annotations
from .conftest import load_fixture
from crafting_table.parsers.typescript import TypeScriptParser
def test_eslint_array_two_messages():
raw = load_fixture("typescript", "eslint.json")
findings = TypeScriptParser.parse(raw, exit_code=1, recipe="lint")
# First file has 2 messages, second has 0.
assert len(findings) == 2
by_code = {f.code: f for f in findings}
assert "no-unused-vars" in by_code
assert "@typescript-eslint/no-explicit-any" in by_code
warn = by_code["no-unused-vars"]
assert warn.severity == "warn"
assert warn.line == 5
assert warn.file == "/work/src/index.ts"
err = by_code["@typescript-eslint/no-explicit-any"]
assert err.severity == "error"
def test_tsc_human_output_parses():
raw = load_fixture("typescript", "tsc.txt")
findings = TypeScriptParser.parse(raw, exit_code=1, recipe="lint")
# 2 errors + 1 warning.
assert len(findings) == 3
codes = sorted(f.code for f in findings)
assert codes == ["TS2304", "TS6133", "TS7006"]
err = next(f for f in findings if f.code == "TS2304")
assert err.severity == "error"
assert err.file == "src/index.ts"
assert err.line == 5
def test_typescript_lint_garbage_no_findings():
findings = TypeScriptParser.parse("nothing", exit_code=0, recipe="lint")
assert findings == []
def test_javascript_alias_handled_by_typescript_parser():
# The registry routes "javascript" to TypeScriptParser.
from crafting_table.parsers import find_parser
from crafting_table.parsers.typescript import TypeScriptParser as TSP
assert find_parser("javascript", "lint") is TSP
def test_ts_audit_falls_through_to_recipe_fail():
f = TypeScriptParser.parse("any", exit_code=1, recipe="audit")
assert len(f) == 1
assert f[0].kind == "recipe_fail"

View file

@ -0,0 +1,196 @@
"""Integration test — runner runs a recipe whose output is parser-shaped, and
findings end up in the DB + visible via GET /jobs/{id}/findings.
We use python:lint with a stub command that emits a known-good ruff JSON
array, then verify:
- findings_count on the job row reflects the parsed entries
- GET /jobs/{id}/findings returns rows with the right kind/code/file/line
- fingerprint is populated and stable across rows for the same locator
"""
from __future__ import annotations
import json
import time
import shutil
import subprocess
from pathlib import Path
import pytest
from tests.conftest import sample_project_payload
def _make_local_git_repo(root: Path) -> str:
if shutil.which("git") is None:
pytest.skip("git binary not present in test environment")
repo = root / "findings-fixture-repo"
repo.mkdir()
subprocess.run(["git", "init", "-q", "-b", "main"], cwd=repo, check=True)
subprocess.run(["git", "config", "user.email", "test@example"], cwd=repo, check=True)
subprocess.run(["git", "config", "user.name", "test"], cwd=repo, check=True)
subprocess.run(["git", "config", "commit.gpgsign", "false"], cwd=repo, check=True)
(repo / "README.md").write_text("hello\n")
subprocess.run(["git", "add", "README.md"], cwd=repo, check=True)
subprocess.run(["git", "commit", "-q", "-m", "initial"], cwd=repo, check=True)
return str(repo)
def _wait_terminal(tc, bearer: str, job_id: str, timeout: float = 30.0) -> dict:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
r = tc.get(
f"/jobs/{job_id}",
headers={"Authorization": f"Bearer {bearer}"},
)
body = r.json()
if body["job"]["status"] in ("succeeded", "failed", "timed_out", "cancelled"):
return body
time.sleep(0.1)
raise AssertionError("job never finished")
def test_python_lint_findings_persisted_via_runner(client, tmp_path):
"""python:lint with a ruff-shaped JSON stub → 2 findings persisted."""
tc, ctx = client
git_url = _make_local_git_repo(tmp_path)
# Ruff-shaped stub. echo + exit 1 (lint findings → non-zero exit) so
# the recipe terminates the way ruff really would.
ruff_stub = json.dumps(
[
{
"code": "F401",
"message": "'os' imported but unused",
"filename": "src/app.py",
"location": {"row": 3, "column": 1},
},
{
"code": "E501",
"message": "Line too long",
"filename": "src/app.py",
"location": {"row": 42, "column": 89},
},
]
)
# Single-quote the JSON so the shell doesn't interpret double-quotes.
lint_cmd = f"echo '{ruff_stub}'; exit 1"
payload = sample_project_payload(name="ct-findings-py")
payload["git_url"] = git_url
payload["subprojects"][0]["language"] = "python"
payload["subprojects"][0]["lint"] = lint_cmd
payload["subprojects"][0]["timeout_secs"] = 20
r = tc.post(
"/projects",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
json=payload,
)
assert r.status_code == 200, r.text
r2 = tc.post(
"/projects/ct-findings-py/jobs",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
json={"recipe": "lint"},
)
assert r2.status_code == 200, r2.text
job_id = r2.json()["job_id"]
final = _wait_terminal(tc, ctx["alpha_bearer"], job_id)
# Recipe exited 1 so the job is "failed" — but parsing still happens.
assert final["job"]["status"] == "failed"
assert final["job"]["exit_code"] == 1
assert final["job"]["findings_count"] == 2
r3 = tc.get(
f"/jobs/{job_id}/findings",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
)
assert r3.status_code == 200, r3.text
findings = r3.json()["findings"]
assert len(findings) == 2
by_code = {f["code"]: f for f in findings}
assert "F401" in by_code and "E501" in by_code
f401 = by_code["F401"]
assert f401["kind"] == "lint"
assert f401["severity"] == "warn"
assert f401["file"] == "src/app.py"
assert f401["line"] == 3
assert f401["fingerprint"]
assert len(f401["fingerprint"]) == 16
def test_unknown_lang_recipe_falls_back_to_generic(client, tmp_path):
"""A recipe with no parser registered emits exactly one recipe_fail
finding when it exits non-zero, and zero findings when it exits 0."""
tc, ctx = client
git_url = _make_local_git_repo(tmp_path)
# Use `ruby` which has no parser; our PythonParser etc. all decline.
payload = sample_project_payload(name="ct-findings-generic")
payload["git_url"] = git_url
payload["subprojects"][0]["language"] = "ruby"
payload["subprojects"][0]["audit"] = "echo audit-output; exit 5"
payload["subprojects"][0]["timeout_secs"] = 20
r = tc.post(
"/projects",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
json=payload,
)
assert r.status_code == 200
r2 = tc.post(
"/projects/ct-findings-generic/jobs",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
json={"recipe": "audit"},
)
job_id = r2.json()["job_id"]
final = _wait_terminal(tc, ctx["alpha_bearer"], job_id)
assert final["job"]["status"] == "failed"
assert final["job"]["findings_count"] == 1
r3 = tc.get(
f"/jobs/{job_id}/findings",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
)
rows = r3.json()["findings"]
assert len(rows) == 1
assert rows[0]["kind"] == "recipe_fail"
assert rows[0]["code"] == "exit_5"
def test_clean_recipe_produces_zero_findings(client, tmp_path):
"""Successful run with no parseable signal → no findings rows, count=0."""
tc, ctx = client
git_url = _make_local_git_repo(tmp_path)
payload = sample_project_payload(name="ct-findings-clean")
payload["git_url"] = git_url
payload["subprojects"][0]["language"] = "python"
# Empty ruff JSON array → 0 findings, exit 0.
payload["subprojects"][0]["lint"] = "echo '[]'; exit 0"
payload["subprojects"][0]["timeout_secs"] = 20
r = tc.post(
"/projects",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
json=payload,
)
assert r.status_code == 200
r2 = tc.post(
"/projects/ct-findings-clean/jobs",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
json={"recipe": "lint"},
)
job_id = r2.json()["job_id"]
final = _wait_terminal(tc, ctx["alpha_bearer"], job_id)
assert final["job"]["status"] == "succeeded"
assert final["job"]["findings_count"] == 0
r3 = tc.get(
f"/jobs/{job_id}/findings",
headers={"Authorization": f"Bearer {ctx['alpha_bearer']}"},
)
assert r3.json()["findings"] == []