"""Python parser — ruff / mypy / pip-audit / pytest. Recipes: - ``lint`` — try ruff JSON first (a top-level array), fall back to mypy JSON-lines if the array parse fails. A user's lint recipe can also be ``ruff check . && mypy ...`` and we handle both shapes interleaved in the same log. - ``audit`` — pip-audit -f json. Top-level object with ``dependencies[]`` each carrying ``vulns[]``. - ``test`` — pytest. Parse ``FAILED tests/...::name - reason`` lines. - ``build`` — defer; pip install / setup.py output isn't a useful structured channel. """ from __future__ import annotations import json import re from .base import Finding, _iter_jsonl, _safe_json_loads class PythonParser: @classmethod def matches(cls, language: str, recipe: str) -> bool: return language == "python" and recipe in {"audit", "lint", "test", "build"} @classmethod def parse(cls, raw_log: str, exit_code: int, recipe: str) -> list[Finding]: if recipe == "lint": return cls._parse_lint(raw_log) if recipe == "audit": return cls._parse_pip_audit(raw_log) if recipe == "test": return cls._parse_pytest(raw_log, exit_code) # build if exit_code != 0: return [ Finding( kind="recipe_fail", severity="warn", code=f"exit_{exit_code}", message=f"python build exited with status {exit_code}", ) ] return [] # ---- lint -------------------------------------------------------------- @classmethod def _parse_lint(cls, raw_log: str) -> list[Finding]: """ruff emits a JSON array; mypy --output=json emits JSON-lines. Both are common in a lint recipe (often `ruff && mypy`). We parse whichever shape applies, attempting both.""" out: list[Finding] = [] out.extend(cls._parse_ruff(raw_log)) out.extend(cls._parse_mypy(raw_log)) return out @classmethod def _parse_ruff(cls, raw_log: str) -> list[Finding]: # Find the JSON array — it might be preceded by a banner / shell echo. arr = _extract_json_array(raw_log) if not isinstance(arr, list): return [] out: list[Finding] = [] for item in arr: if not isinstance(item, dict): continue # Ruff entries: {code, message, filename, location:{row,column}, ...} # We also tolerate mypy-shaped entries here in case they leak in; # mypy's _parse will skip them. if "code" not in item or "message" not in item: continue loc = item.get("location") or {} file = item.get("filename") row = loc.get("row") if isinstance(loc, dict) else None fix = item.get("fix") or {} suggested = None if isinstance(fix, dict): suggested = fix.get("message") or fix.get("applicability") out.append( Finding( kind="lint", severity="warn", file=file, line=row, code=item.get("code"), message=item.get("message") or "", suggested_fix=suggested, raw_json=json.dumps(item), ) ) return out @classmethod def _parse_mypy(cls, raw_log: str) -> list[Finding]: out: list[Finding] = [] for obj in _iter_jsonl(raw_log): if not isinstance(obj, dict): continue # mypy JSON-line: {"file":..., "line":..., "column":..., # "severity":"error"|"note", "message":..., # "code":...} if "file" not in obj or "message" not in obj or "severity" not in obj: continue sev_in = obj.get("severity") or "warn" sev = "error" if sev_in == "error" else "warn" out.append( Finding( kind="lint", severity=sev, file=obj.get("file"), line=obj.get("line"), code=obj.get("code") or "mypy", message=obj.get("message") or "", raw_json=json.dumps(obj), ) ) return out # ---- pip-audit --------------------------------------------------------- @classmethod def _parse_pip_audit(cls, raw_log: str) -> list[Finding]: """pip-audit -f json shape: {"dependencies":[{"name":..., "version":..., "vulns":[{"id":..., "fix_versions":[...], "description":...}]}]} """ envelope = _extract_json_object(raw_log) if envelope is None: return [] deps = envelope.get("dependencies") or [] out: list[Finding] = [] for dep in deps: if not isinstance(dep, dict): continue pkg = dep.get("name") or "?" ver = dep.get("version") or "?" for vuln in dep.get("vulns") or []: if not isinstance(vuln, dict): continue vid = vuln.get("id") or "PYSEC-?" desc = vuln.get("description") or "vulnerability" fixes = vuln.get("fix_versions") or [] fix_str = ", ".join(fixes) if fixes else "no fix available" out.append( Finding( kind="cve", severity="high", code=vid, message=f"{pkg} {ver}: {desc} — fixed in {fix_str}", suggested_fix=( f"bump {pkg} to {fixes[0]}" if fixes else None ), raw_json=json.dumps({"dep": dep, "vuln": vuln}), extras={ "package": pkg, "version": ver, "fixed_in": fixes, "advisory": vid, }, ) ) return out # ---- pytest ------------------------------------------------------------ _PYTEST_FAILED_RE = re.compile(r"^FAILED\s+(\S+)\s*(?:-\s*(.+))?$") @classmethod def _parse_pytest(cls, raw_log: str, exit_code: int) -> list[Finding]: if exit_code == 0: return [] out: list[Finding] = [] seen: set[str] = set() for line in raw_log.splitlines(): m = cls._PYTEST_FAILED_RE.match(line.strip()) if not m: continue name = m.group(1) reason = (m.group(2) or "").strip() if name in seen: continue seen.add(name) # Split file::test_name to fill `file` column when possible. file: str | None = None if "::" in name: file = name.split("::", 1)[0] out.append( Finding( kind="test_fail", severity="error", file=file, code=name, message=f"pytest {name} failed" + (f": {reason}" if reason else ""), ) ) if not out: out.append( Finding( kind="test_fail", severity="error", code=f"exit_{exit_code}", message=( f"pytest exited {exit_code} but no FAILED lines " f"detected; test process exited non-zero" ), ) ) return out def _extract_json_array(text: str) -> list | None: """Find the first balanced ``[...]`` block and json.loads it.""" start = text.find("[") while start != -1: depth = 0 in_str = False esc = False for i in range(start, len(text)): c = text[i] if in_str: if esc: esc = False elif c == "\\": esc = True elif c == '"': in_str = False continue if c == '"': in_str = True elif c == "[": depth += 1 elif c == "]": depth -= 1 if depth == 0: candidate = text[start : i + 1] parsed = _safe_json_loads(candidate) if isinstance(parsed, list): return parsed break start = text.find("[", start + 1) return None def _extract_json_object(text: str) -> dict | None: """Like _extract_json_array but for objects.""" start = text.find("{") while start != -1: depth = 0 in_str = False esc = False for i in range(start, len(text)): c = text[i] if in_str: if esc: esc = False elif c == "\\": esc = True elif c == '"': in_str = False continue if c == '"': in_str = True elif c == "{": depth += 1 elif c == "}": depth -= 1 if depth == 0: candidate = text[start : i + 1] parsed = _safe_json_loads(candidate) if isinstance(parsed, dict): return parsed break start = text.find("{", start + 1) return None