crafting-table/tests/test_auth.py
Kayos 0ec3a04676 v0.1 wave 1 (steps 2+3+4): SQLite ledger + FastAPI skeleton + async job runner
- db.py: migrations + DAOs for tokens / projects / jobs / findings (SQLite WAL)
- auth.py: SHA-256 bearer hashing + LAN-CIDR allowlist + admin/app token tiers
- models.py: Pydantic shapes (Project, Subproject, Schedule, Notify, Job, CreateJobRequest)
- server.py: FastAPI on port 8810; /healthz, /admin/tokens/*, /projects/*, /jobs, /jobs/{id}, /jobs/{id}/log, /jobs/{id}/findings
- runner.py: bounded asyncio pool, per-job timeout with process-group SIGTERM→SIGKILL escalation, orphaned-job recovery on boot
- workspace.py: bare-clone + worktree materialization, gc
- config.py: env-driven
- 62 tests across db / auth / projects / jobs / runner / e2e — all green

Cross-token project access returns 404 (not 403) — existence-leak guard.
Bearer tokens hashed at rest; admin token bootstrapped on first boot.
Recipe subprocess uses start_new_session=True so killpg targets the
whole process tree on timeout — child processes can't escape SIGKILL.
Pump task guarded with wait_for(2s) + cancel fallback against any
orphan that survives the group kill.

Wave 2 (parsers + findings extraction + MCP + email digest) pending.

Spec: memory/spec-crafting-table.md
2026-04-29 08:17:41 -07:00

122 lines
4.2 KiB
Python

"""Auth — bearer hashing, IP allowlist, admin vs app, revoked token reject."""
from __future__ import annotations
import pytest
from fastapi import HTTPException
from crafting_table.auth import Auth, _ip_in_any, _const_eq
class _FakeRequest:
def __init__(self, ip: str):
class C:
host = ip
self.client = C()
def test_bearer_hash_lookup_match(db_only):
db_only.insert_token(name="x", bearer="ct_secret_x", is_admin=False, ip_cidrs=None)
rec = db_only.lookup_token_by_bearer("ct_secret_x")
assert rec is not None
assert rec["name"] == "x"
def test_bearer_hash_lookup_miss(db_only):
assert db_only.lookup_token_by_bearer("definitely-not-real") is None
def test_ip_in_any_loopback_always_allowed():
assert _ip_in_any("127.0.0.1", []) is True
assert _ip_in_any("::1", []) is True
def test_ip_in_any_match_cidr():
assert _ip_in_any("192.168.0.5", ["192.168.0.0/16"]) is True
assert _ip_in_any("10.1.2.3", ["10.0.0.0/8"]) is True
def test_ip_in_any_miss():
assert _ip_in_any("8.8.8.8", ["192.168.0.0/16", "10.0.0.0/8"]) is False
def test_ip_in_any_invalid_input():
assert _ip_in_any("not-an-ip", ["10.0.0.0/8"]) is False
def test_const_eq_basic():
assert _const_eq("abc", "abc") is True
assert _const_eq("abc", "abd") is False
assert _const_eq("abc", "abcd") is False
def test_require_app_missing_bearer(db_only):
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
with pytest.raises(HTTPException) as ei:
a.require_app(_FakeRequest("127.0.0.1"), None)
assert ei.value.status_code == 401
def test_require_app_bad_bearer(db_only):
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
with pytest.raises(HTTPException) as ei:
a.require_app(_FakeRequest("127.0.0.1"), "Bearer not-a-real-token")
assert ei.value.status_code == 403
def test_require_app_revoked_rejects(db_only):
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
db_only.insert_token(name="r", bearer="ct_r", is_admin=False, ip_cidrs=None)
db_only.revoke_token("r")
with pytest.raises(HTTPException) as ei:
a.require_app(_FakeRequest("127.0.0.1"), "Bearer ct_r")
assert ei.value.status_code == 403
def test_require_app_per_token_ip_allowlist(db_only):
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8", "10.0.0.0/8"])
db_only.insert_token(name="ip", bearer="ct_ip", is_admin=False, ip_cidrs=["10.0.0.0/8"])
# 10.x is in the allowlist
tok = a.require_app(_FakeRequest("10.0.0.5"), "Bearer ct_ip")
assert tok.name == "ip"
# Loopback is allowed because _ip_in_any short-circuits on is_loopback
tok2 = a.require_app(_FakeRequest("127.0.0.1"), "Bearer ct_ip")
assert tok2.name == "ip"
def test_require_admin_rejects_non_admin(db_only):
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
db_only.insert_token(name="u", bearer="ct_u", is_admin=False, ip_cidrs=None)
with pytest.raises(HTTPException) as ei:
a.require_admin(_FakeRequest("127.0.0.1"), "Bearer ct_u")
assert ei.value.status_code == 403
def test_require_admin_accepts_admin(db_only):
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
db_only.insert_token(name="adm", bearer="ct_adm", is_admin=True, ip_cidrs=None)
tok = a.require_admin(_FakeRequest("127.0.0.1"), "Bearer ct_adm")
assert tok.is_admin is True
def test_global_ip_blocks_off_lan(db_only):
a = Auth(db=db_only, lan_cidrs=["192.168.0.0/16"])
with pytest.raises(HTTPException) as ei:
a.require_global_ip(_FakeRequest("8.8.8.8"))
assert ei.value.status_code == 403
def test_bootstrap_writes_admin_bearer(tmp_workspace):
"""First boot mints + writes the bearer file. Second boot reads existing."""
from crafting_table.db import DB
from crafting_table.auth import Auth
db = DB(str(tmp_workspace["db_path"]))
a = Auth(db=db, lan_cidrs=["127.0.0.0/8"])
bearer1 = a.bootstrap_admin(tmp_workspace["admin_bearer"])
assert tmp_workspace["admin_bearer"].exists()
assert tmp_workspace["admin_bearer"].stat().st_mode & 0o777 == 0o600
assert bearer1.startswith("ct_")
# Second call returns the same bearer (read off disk)
bearer2 = a.bootstrap_admin(tmp_workspace["admin_bearer"])
assert bearer2 == bearer1