crafting-table/crafting_table/parsers/python.py
Kayos d467b2f5be v0.1 wave 2A (steps 5+6): per-language parsers + findings extraction
- parsers/ package: rust / python / go / typescript / generic
- parser registry with language+recipe -> fallback resolution
- fingerprint hash (kind+file+line+code) for cross-run dedup
- runner.py post-exec hook: parse log, persist findings, count on job row
  (extraction runs before mark_job_finished so callers polling on terminal
  status see findings_count populated atomically)
- db.insert_finding / list_findings / increment_findings_count DAOs already
  shipped in wave 1; wired here
- GET /jobs/{id}/findings now returns real data (server route already
  shipped; was returning empty list because nothing populated the table)
- tests/test_parsers/: 6 modules + 11 fixtures (rust/python/go/typescript)
- tests/test_runner_findings.py: 3 integration tests
- README: tick steps 2-6, add Findings section

Suite: 108 passing (62 wave-1 + 46 new).
Spec: memory/spec-crafting-table.md
2026-04-29 08:36:16 -07:00

277 lines
9.7 KiB
Python

"""Python parser — ruff / mypy / pip-audit / pytest.
Recipes:
- ``lint`` — try ruff JSON first (a top-level array), fall back to mypy
JSON-lines if the array parse fails. A user's lint recipe can also be
``ruff check . && mypy ...`` and we handle both shapes interleaved in
the same log.
- ``audit`` — pip-audit -f json. Top-level object with ``dependencies[]``
each carrying ``vulns[]``.
- ``test`` — pytest. Parse ``FAILED tests/...::name - reason`` lines.
- ``build`` — defer; pip install / setup.py output isn't a useful
structured channel.
"""
from __future__ import annotations
import json
import re
from .base import Finding, _iter_jsonl, _safe_json_loads
class PythonParser:
@classmethod
def matches(cls, language: str, recipe: str) -> bool:
return language == "python" and recipe in {"audit", "lint", "test", "build"}
@classmethod
def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]:
if recipe == "lint":
return cls._parse_lint(raw_log)
if recipe == "audit":
return cls._parse_pip_audit(raw_log)
if recipe == "test":
return cls._parse_pytest(raw_log, exit_code)
# build
if exit_code != 0:
return [
Finding(
kind="recipe_fail",
severity="warn",
code=f"exit_{exit_code}",
message=f"python build exited with status {exit_code}",
)
]
return []
# ---- lint --------------------------------------------------------------
@classmethod
def _parse_lint(cls, raw_log: str) -> list[Finding]:
"""ruff emits a JSON array; mypy --output=json emits JSON-lines.
Both are common in a lint recipe (often `ruff && mypy`). We parse
whichever shape applies, attempting both."""
out: list[Finding] = []
out.extend(cls._parse_ruff(raw_log))
out.extend(cls._parse_mypy(raw_log))
return out
@classmethod
def _parse_ruff(cls, raw_log: str) -> list[Finding]:
# Find the JSON array — it might be preceded by a banner / shell echo.
arr = _extract_json_array(raw_log)
if not isinstance(arr, list):
return []
out: list[Finding] = []
for item in arr:
if not isinstance(item, dict):
continue
# Ruff entries: {code, message, filename, location:{row,column}, ...}
# We also tolerate mypy-shaped entries here in case they leak in;
# mypy's _parse will skip them.
if "code" not in item or "message" not in item:
continue
loc = item.get("location") or {}
file = item.get("filename")
row = loc.get("row") if isinstance(loc, dict) else None
fix = item.get("fix") or {}
suggested = None
if isinstance(fix, dict):
suggested = fix.get("message") or fix.get("applicability")
out.append(
Finding(
kind="lint",
severity="warn",
file=file,
line=row,
code=item.get("code"),
message=item.get("message") or "",
suggested_fix=suggested,
raw_json=json.dumps(item),
)
)
return out
@classmethod
def _parse_mypy(cls, raw_log: str) -> list[Finding]:
out: list[Finding] = []
for obj in _iter_jsonl(raw_log):
if not isinstance(obj, dict):
continue
# mypy JSON-line: {"file":..., "line":..., "column":...,
# "severity":"error"|"note", "message":...,
# "code":...}
if "file" not in obj or "message" not in obj or "severity" not in obj:
continue
sev_in = obj.get("severity") or "warn"
sev = "error" if sev_in == "error" else "warn"
out.append(
Finding(
kind="lint",
severity=sev,
file=obj.get("file"),
line=obj.get("line"),
code=obj.get("code") or "mypy",
message=obj.get("message") or "",
raw_json=json.dumps(obj),
)
)
return out
# ---- pip-audit ---------------------------------------------------------
@classmethod
def _parse_pip_audit(cls, raw_log: str) -> list[Finding]:
"""pip-audit -f json shape:
{"dependencies":[{"name":..., "version":...,
"vulns":[{"id":..., "fix_versions":[...],
"description":...}]}]}
"""
envelope = _extract_json_object(raw_log)
if envelope is None:
return []
deps = envelope.get("dependencies") or []
out: list[Finding] = []
for dep in deps:
if not isinstance(dep, dict):
continue
pkg = dep.get("name") or "?"
ver = dep.get("version") or "?"
for vuln in dep.get("vulns") or []:
if not isinstance(vuln, dict):
continue
vid = vuln.get("id") or "PYSEC-?"
desc = vuln.get("description") or "vulnerability"
fixes = vuln.get("fix_versions") or []
fix_str = ", ".join(fixes) if fixes else "no fix available"
out.append(
Finding(
kind="cve",
severity="high",
code=vid,
message=f"{pkg} {ver}: {desc} — fixed in {fix_str}",
suggested_fix=(
f"bump {pkg} to {fixes[0]}" if fixes else None
),
raw_json=json.dumps({"dep": dep, "vuln": vuln}),
extras={
"package": pkg,
"version": ver,
"fixed_in": fixes,
"advisory": vid,
},
)
)
return out
# ---- pytest ------------------------------------------------------------
_PYTEST_FAILED_RE = re.compile(r"^FAILED\s+(\S+)\s*(?:-\s*(.+))?$")
@classmethod
def _parse_pytest(cls, raw_log: str, exit_code: int) -> list[Finding]:
if exit_code == 0:
return []
out: list[Finding] = []
seen: set[str] = set()
for line in raw_log.splitlines():
m = cls._PYTEST_FAILED_RE.match(line.strip())
if not m:
continue
name = m.group(1)
reason = (m.group(2) or "").strip()
if name in seen:
continue
seen.add(name)
# Split file::test_name to fill `file` column when possible.
file: str | None = None
if "::" in name:
file = name.split("::", 1)[0]
out.append(
Finding(
kind="test_fail",
severity="error",
file=file,
code=name,
message=f"pytest {name} failed" + (f": {reason}" if reason else ""),
)
)
if not out:
out.append(
Finding(
kind="test_fail",
severity="error",
code=f"exit_{exit_code}",
message=(
f"pytest exited {exit_code} but no FAILED lines "
f"detected; test process exited non-zero"
),
)
)
return out
def _extract_json_array(text: str) -> list | None:
"""Find the first balanced ``[...]`` block and json.loads it."""
start = text.find("[")
while start != -1:
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
c = text[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c == "[":
depth += 1
elif c == "]":
depth -= 1
if depth == 0:
candidate = text[start : i + 1]
parsed = _safe_json_loads(candidate)
if isinstance(parsed, list):
return parsed
break
start = text.find("[", start + 1)
return None
def _extract_json_object(text: str) -> dict | None:
"""Like _extract_json_array but for objects."""
start = text.find("{")
while start != -1:
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
c = text[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
candidate = text[start : i + 1]
parsed = _safe_json_loads(candidate)
if isinstance(parsed, dict):
return parsed
break
start = text.find("{", start + 1)
return None