- parsers/ package: rust / python / go / typescript / generic
- parser registry with language+recipe -> fallback resolution
- fingerprint hash (kind+file+line+code) for cross-run dedup
- runner.py post-exec hook: parse log, persist findings, count on job row
(extraction runs before mark_job_finished so callers polling on terminal
status see findings_count populated atomically)
- db.insert_finding / list_findings / increment_findings_count DAOs already
shipped in wave 1; wired here
- GET /jobs/{id}/findings now returns real data (server route already
shipped; was returning empty list because nothing populated the table)
- tests/test_parsers/: 6 modules + 11 fixtures (rust/python/go/typescript)
- tests/test_runner_findings.py: 3 integration tests
- README: tick steps 2-6, add Findings section
Suite: 108 passing (62 wave-1 + 46 new).
Spec: memory/spec-crafting-table.md
172 lines
6.1 KiB
Python
172 lines
6.1 KiB
Python
"""Go parser — go vet / govulncheck.
|
|
|
|
Recipes:
|
|
- ``lint`` → ``go vet -json`` writes a per-package JSON envelope to
|
|
stderr keyed by package name → "tool name" → list of diagnostics.
|
|
- ``audit`` → ``govulncheck -json`` emits NDJSON; we filter for
|
|
``message.finding`` records that carry an OSV id and trace.
|
|
- ``test`` / ``build`` → defer to recipe_fail-on-nonzero.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
from .base import Finding, _iter_jsonl, _safe_json_loads
|
|
|
|
|
|
class GoParser:
|
|
@classmethod
|
|
def matches(cls, language: str, recipe: str) -> bool:
|
|
return language == "go" and recipe in {"audit", "lint", "test", "build"}
|
|
|
|
@classmethod
|
|
def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]:
|
|
if recipe == "lint":
|
|
return cls._parse_govet(raw_log)
|
|
if recipe == "audit":
|
|
return cls._parse_govulncheck(raw_log)
|
|
if exit_code != 0:
|
|
return [
|
|
Finding(
|
|
kind="recipe_fail",
|
|
severity="warn",
|
|
code=f"exit_{exit_code}",
|
|
message=f"go {recipe} exited with status {exit_code}",
|
|
)
|
|
]
|
|
return []
|
|
|
|
# ---- go vet ------------------------------------------------------------
|
|
|
|
@classmethod
|
|
def _parse_govet(cls, raw_log: str) -> list[Finding]:
|
|
"""``go vet -json`` envelope:
|
|
{"package/path": {"analyzer-name": [{posn, message, ...}, ...]}}
|
|
``posn`` looks like ``/abs/path/file.go:LINE:COL``. We pluck the
|
|
leading object out of the log (it's preceded by `go vet`'s usual
|
|
chatter) and walk it.
|
|
"""
|
|
envelope = _extract_json_object(raw_log)
|
|
if not isinstance(envelope, dict):
|
|
return []
|
|
out: list[Finding] = []
|
|
for pkg, analyzers in envelope.items():
|
|
if not isinstance(analyzers, dict):
|
|
continue
|
|
for analyzer, items in analyzers.items():
|
|
if not isinstance(items, list):
|
|
continue
|
|
for diag in items:
|
|
if not isinstance(diag, dict):
|
|
continue
|
|
posn = diag.get("posn") or ""
|
|
file, line = _parse_posn(posn)
|
|
out.append(
|
|
Finding(
|
|
kind="lint",
|
|
severity="warn",
|
|
file=file,
|
|
line=line,
|
|
code=analyzer,
|
|
message=diag.get("message") or "",
|
|
raw_json=json.dumps({"package": pkg, "analyzer": analyzer, "diag": diag}),
|
|
)
|
|
)
|
|
return out
|
|
|
|
# ---- govulncheck -------------------------------------------------------
|
|
|
|
@classmethod
|
|
def _parse_govulncheck(cls, raw_log: str) -> list[Finding]:
|
|
"""govulncheck -json emits NDJSON. The records of interest are
|
|
``{"finding": {...}}`` carrying an ``osv`` id; we also accept
|
|
``{"vulnerability": {...}}`` which older versions emit.
|
|
Dedup by OSV id since one vuln may be reported per call site.
|
|
"""
|
|
seen: set[str] = set()
|
|
out: list[Finding] = []
|
|
for obj in _iter_jsonl(raw_log):
|
|
if not isinstance(obj, dict):
|
|
continue
|
|
finding = obj.get("finding") or obj.get("vulnerability")
|
|
if not isinstance(finding, dict):
|
|
continue
|
|
osv = finding.get("osv") or finding.get("id") or finding.get("OSV")
|
|
if not osv or osv in seen:
|
|
continue
|
|
seen.add(osv)
|
|
mod = finding.get("module") or finding.get("Module") or "?"
|
|
fix = finding.get("fixed_version") or finding.get("FixedVersion")
|
|
summary = (
|
|
finding.get("summary")
|
|
or finding.get("Summary")
|
|
or finding.get("description")
|
|
or "vulnerability"
|
|
)
|
|
out.append(
|
|
Finding(
|
|
kind="cve",
|
|
severity="high",
|
|
code=osv,
|
|
message=f"{mod}: {summary}" + (f" — fixed in {fix}" if fix else ""),
|
|
suggested_fix=f"bump {mod} to {fix}" if fix else None,
|
|
raw_json=json.dumps(obj),
|
|
extras={"package": mod, "fixed_in": fix, "advisory": osv},
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def _parse_posn(posn: str) -> tuple[str | None, int | None]:
|
|
"""Parse ``/path/to/file.go:LINE:COL`` (or ``file.go:LINE``) into
|
|
(file, line). Returns (None, None) on garbage input."""
|
|
if not posn:
|
|
return None, None
|
|
# Walk from the right so the first colon hit is the column or line.
|
|
parts = posn.rsplit(":", 2)
|
|
if len(parts) == 3:
|
|
file, line_s, _col = parts
|
|
try:
|
|
return file, int(line_s)
|
|
except ValueError:
|
|
return file, None
|
|
if len(parts) == 2:
|
|
file, line_s = parts
|
|
try:
|
|
return file, int(line_s)
|
|
except ValueError:
|
|
return file, None
|
|
return posn, None
|
|
|
|
|
|
def _extract_json_object(text: str) -> dict | None:
|
|
"""Find the first balanced ``{...}`` block and json.loads it."""
|
|
start = text.find("{")
|
|
while start != -1:
|
|
depth = 0
|
|
in_str = False
|
|
esc = False
|
|
for i in range(start, len(text)):
|
|
c = text[i]
|
|
if in_str:
|
|
if esc:
|
|
esc = False
|
|
elif c == "\\":
|
|
esc = True
|
|
elif c == '"':
|
|
in_str = False
|
|
continue
|
|
if c == '"':
|
|
in_str = True
|
|
elif c == "{":
|
|
depth += 1
|
|
elif c == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
candidate = text[start : i + 1]
|
|
parsed = _safe_json_loads(candidate)
|
|
if isinstance(parsed, dict):
|
|
return parsed
|
|
break
|
|
start = text.find("{", start + 1)
|
|
return None
|