- parsers/ package: rust / python / go / typescript / generic
- parser registry with language+recipe -> fallback resolution
- fingerprint hash (kind+file+line+code) for cross-run dedup
- runner.py post-exec hook: parse log, persist findings, count on job row
(extraction runs before mark_job_finished so callers polling on terminal
status see findings_count populated atomically)
- db.insert_finding / list_findings / increment_findings_count DAOs already
shipped in wave 1; wired here
- GET /jobs/{id}/findings now returns real data (server route already
shipped; was returning empty list because nothing populated the table)
- tests/test_parsers/: 6 modules + 11 fixtures (rust/python/go/typescript)
- tests/test_runner_findings.py: 3 integration tests
- README: tick steps 2-6, add Findings section
Suite: 108 passing (62 wave-1 + 46 new).
Spec: memory/spec-crafting-table.md
277 lines
9.7 KiB
Python
277 lines
9.7 KiB
Python
"""Python parser — ruff / mypy / pip-audit / pytest.
|
|
|
|
Recipes:
|
|
- ``lint`` — try ruff JSON first (a top-level array), fall back to mypy
|
|
JSON-lines if the array parse fails. A user's lint recipe can also be
|
|
``ruff check . && mypy ...`` and we handle both shapes interleaved in
|
|
the same log.
|
|
- ``audit`` — pip-audit -f json. Top-level object with ``dependencies[]``
|
|
each carrying ``vulns[]``.
|
|
- ``test`` — pytest. Parse ``FAILED tests/...::name - reason`` lines.
|
|
- ``build`` — defer; pip install / setup.py output isn't a useful
|
|
structured channel.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
|
|
from .base import Finding, _iter_jsonl, _safe_json_loads
|
|
|
|
|
|
class PythonParser:
|
|
@classmethod
|
|
def matches(cls, language: str, recipe: str) -> bool:
|
|
return language == "python" and recipe in {"audit", "lint", "test", "build"}
|
|
|
|
@classmethod
|
|
def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]:
|
|
if recipe == "lint":
|
|
return cls._parse_lint(raw_log)
|
|
if recipe == "audit":
|
|
return cls._parse_pip_audit(raw_log)
|
|
if recipe == "test":
|
|
return cls._parse_pytest(raw_log, exit_code)
|
|
# build
|
|
if exit_code != 0:
|
|
return [
|
|
Finding(
|
|
kind="recipe_fail",
|
|
severity="warn",
|
|
code=f"exit_{exit_code}",
|
|
message=f"python build exited with status {exit_code}",
|
|
)
|
|
]
|
|
return []
|
|
|
|
# ---- lint --------------------------------------------------------------
|
|
|
|
@classmethod
|
|
def _parse_lint(cls, raw_log: str) -> list[Finding]:
|
|
"""ruff emits a JSON array; mypy --output=json emits JSON-lines.
|
|
Both are common in a lint recipe (often `ruff && mypy`). We parse
|
|
whichever shape applies, attempting both."""
|
|
out: list[Finding] = []
|
|
out.extend(cls._parse_ruff(raw_log))
|
|
out.extend(cls._parse_mypy(raw_log))
|
|
return out
|
|
|
|
@classmethod
|
|
def _parse_ruff(cls, raw_log: str) -> list[Finding]:
|
|
# Find the JSON array — it might be preceded by a banner / shell echo.
|
|
arr = _extract_json_array(raw_log)
|
|
if not isinstance(arr, list):
|
|
return []
|
|
out: list[Finding] = []
|
|
for item in arr:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
# Ruff entries: {code, message, filename, location:{row,column}, ...}
|
|
# We also tolerate mypy-shaped entries here in case they leak in;
|
|
# mypy's _parse will skip them.
|
|
if "code" not in item or "message" not in item:
|
|
continue
|
|
loc = item.get("location") or {}
|
|
file = item.get("filename")
|
|
row = loc.get("row") if isinstance(loc, dict) else None
|
|
fix = item.get("fix") or {}
|
|
suggested = None
|
|
if isinstance(fix, dict):
|
|
suggested = fix.get("message") or fix.get("applicability")
|
|
out.append(
|
|
Finding(
|
|
kind="lint",
|
|
severity="warn",
|
|
file=file,
|
|
line=row,
|
|
code=item.get("code"),
|
|
message=item.get("message") or "",
|
|
suggested_fix=suggested,
|
|
raw_json=json.dumps(item),
|
|
)
|
|
)
|
|
return out
|
|
|
|
@classmethod
|
|
def _parse_mypy(cls, raw_log: str) -> list[Finding]:
|
|
out: list[Finding] = []
|
|
for obj in _iter_jsonl(raw_log):
|
|
if not isinstance(obj, dict):
|
|
continue
|
|
# mypy JSON-line: {"file":..., "line":..., "column":...,
|
|
# "severity":"error"|"note", "message":...,
|
|
# "code":...}
|
|
if "file" not in obj or "message" not in obj or "severity" not in obj:
|
|
continue
|
|
sev_in = obj.get("severity") or "warn"
|
|
sev = "error" if sev_in == "error" else "warn"
|
|
out.append(
|
|
Finding(
|
|
kind="lint",
|
|
severity=sev,
|
|
file=obj.get("file"),
|
|
line=obj.get("line"),
|
|
code=obj.get("code") or "mypy",
|
|
message=obj.get("message") or "",
|
|
raw_json=json.dumps(obj),
|
|
)
|
|
)
|
|
return out
|
|
|
|
# ---- pip-audit ---------------------------------------------------------
|
|
|
|
@classmethod
|
|
def _parse_pip_audit(cls, raw_log: str) -> list[Finding]:
|
|
"""pip-audit -f json shape:
|
|
{"dependencies":[{"name":..., "version":...,
|
|
"vulns":[{"id":..., "fix_versions":[...],
|
|
"description":...}]}]}
|
|
"""
|
|
envelope = _extract_json_object(raw_log)
|
|
if envelope is None:
|
|
return []
|
|
deps = envelope.get("dependencies") or []
|
|
out: list[Finding] = []
|
|
for dep in deps:
|
|
if not isinstance(dep, dict):
|
|
continue
|
|
pkg = dep.get("name") or "?"
|
|
ver = dep.get("version") or "?"
|
|
for vuln in dep.get("vulns") or []:
|
|
if not isinstance(vuln, dict):
|
|
continue
|
|
vid = vuln.get("id") or "PYSEC-?"
|
|
desc = vuln.get("description") or "vulnerability"
|
|
fixes = vuln.get("fix_versions") or []
|
|
fix_str = ", ".join(fixes) if fixes else "no fix available"
|
|
out.append(
|
|
Finding(
|
|
kind="cve",
|
|
severity="high",
|
|
code=vid,
|
|
message=f"{pkg} {ver}: {desc} — fixed in {fix_str}",
|
|
suggested_fix=(
|
|
f"bump {pkg} to {fixes[0]}" if fixes else None
|
|
),
|
|
raw_json=json.dumps({"dep": dep, "vuln": vuln}),
|
|
extras={
|
|
"package": pkg,
|
|
"version": ver,
|
|
"fixed_in": fixes,
|
|
"advisory": vid,
|
|
},
|
|
)
|
|
)
|
|
return out
|
|
|
|
# ---- pytest ------------------------------------------------------------
|
|
|
|
_PYTEST_FAILED_RE = re.compile(r"^FAILED\s+(\S+)\s*(?:-\s*(.+))?$")
|
|
|
|
@classmethod
|
|
def _parse_pytest(cls, raw_log: str, exit_code: int) -> list[Finding]:
|
|
if exit_code == 0:
|
|
return []
|
|
out: list[Finding] = []
|
|
seen: set[str] = set()
|
|
for line in raw_log.splitlines():
|
|
m = cls._PYTEST_FAILED_RE.match(line.strip())
|
|
if not m:
|
|
continue
|
|
name = m.group(1)
|
|
reason = (m.group(2) or "").strip()
|
|
if name in seen:
|
|
continue
|
|
seen.add(name)
|
|
# Split file::test_name to fill `file` column when possible.
|
|
file: str | None = None
|
|
if "::" in name:
|
|
file = name.split("::", 1)[0]
|
|
out.append(
|
|
Finding(
|
|
kind="test_fail",
|
|
severity="error",
|
|
file=file,
|
|
code=name,
|
|
message=f"pytest {name} failed" + (f": {reason}" if reason else ""),
|
|
)
|
|
)
|
|
if not out:
|
|
out.append(
|
|
Finding(
|
|
kind="test_fail",
|
|
severity="error",
|
|
code=f"exit_{exit_code}",
|
|
message=(
|
|
f"pytest exited {exit_code} but no FAILED lines "
|
|
f"detected; test process exited non-zero"
|
|
),
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def _extract_json_array(text: str) -> list | None:
|
|
"""Find the first balanced ``[...]`` block and json.loads it."""
|
|
start = text.find("[")
|
|
while start != -1:
|
|
depth = 0
|
|
in_str = False
|
|
esc = False
|
|
for i in range(start, len(text)):
|
|
c = text[i]
|
|
if in_str:
|
|
if esc:
|
|
esc = False
|
|
elif c == "\\":
|
|
esc = True
|
|
elif c == '"':
|
|
in_str = False
|
|
continue
|
|
if c == '"':
|
|
in_str = True
|
|
elif c == "[":
|
|
depth += 1
|
|
elif c == "]":
|
|
depth -= 1
|
|
if depth == 0:
|
|
candidate = text[start : i + 1]
|
|
parsed = _safe_json_loads(candidate)
|
|
if isinstance(parsed, list):
|
|
return parsed
|
|
break
|
|
start = text.find("[", start + 1)
|
|
return None
|
|
|
|
|
|
def _extract_json_object(text: str) -> dict | None:
|
|
"""Like _extract_json_array but for objects."""
|
|
start = text.find("{")
|
|
while start != -1:
|
|
depth = 0
|
|
in_str = False
|
|
esc = False
|
|
for i in range(start, len(text)):
|
|
c = text[i]
|
|
if in_str:
|
|
if esc:
|
|
esc = False
|
|
elif c == "\\":
|
|
esc = True
|
|
elif c == '"':
|
|
in_str = False
|
|
continue
|
|
if c == '"':
|
|
in_str = True
|
|
elif c == "{":
|
|
depth += 1
|
|
elif c == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
candidate = text[start : i + 1]
|
|
parsed = _safe_json_loads(candidate)
|
|
if isinstance(parsed, dict):
|
|
return parsed
|
|
break
|
|
start = text.find("{", start + 1)
|
|
return None
|