- db.py: migrations + DAOs for tokens / projects / jobs / findings (SQLite WAL)
- auth.py: SHA-256 bearer hashing + LAN-CIDR allowlist + admin/app token tiers
- models.py: Pydantic shapes (Project, Subproject, Schedule, Notify, Job, CreateJobRequest)
- server.py: FastAPI on port 8810; /healthz, /admin/tokens/*, /projects/*, /jobs, /jobs/{id}, /jobs/{id}/log, /jobs/{id}/findings
- runner.py: bounded asyncio pool, per-job timeout with process-group SIGTERM→SIGKILL escalation, orphaned-job recovery on boot
- workspace.py: bare-clone + worktree materialization, gc
- config.py: env-driven
- 62 tests across db / auth / projects / jobs / runner / e2e — all green
Cross-token project access returns 404 (not 403) — existence-leak guard.
Bearer tokens hashed at rest; admin token bootstrapped on first boot.
Recipe subprocess uses start_new_session=True so killpg targets the
whole process tree on timeout — child processes can't escape SIGKILL.
Pump task guarded with wait_for(2s) + cancel fallback against any
orphan that survives the group kill.
Wave 2 (parsers + findings extraction + MCP + email digest) pending.
Spec: memory/spec-crafting-table.md
122 lines
4.2 KiB
Python
122 lines
4.2 KiB
Python
"""Auth — bearer hashing, IP allowlist, admin vs app, revoked token reject."""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
|
|
from crafting_table.auth import Auth, _ip_in_any, _const_eq
|
|
|
|
|
|
class _FakeRequest:
|
|
def __init__(self, ip: str):
|
|
class C:
|
|
host = ip
|
|
self.client = C()
|
|
|
|
|
|
def test_bearer_hash_lookup_match(db_only):
|
|
db_only.insert_token(name="x", bearer="ct_secret_x", is_admin=False, ip_cidrs=None)
|
|
rec = db_only.lookup_token_by_bearer("ct_secret_x")
|
|
assert rec is not None
|
|
assert rec["name"] == "x"
|
|
|
|
|
|
def test_bearer_hash_lookup_miss(db_only):
|
|
assert db_only.lookup_token_by_bearer("definitely-not-real") is None
|
|
|
|
|
|
def test_ip_in_any_loopback_always_allowed():
|
|
assert _ip_in_any("127.0.0.1", []) is True
|
|
assert _ip_in_any("::1", []) is True
|
|
|
|
|
|
def test_ip_in_any_match_cidr():
|
|
assert _ip_in_any("192.168.0.5", ["192.168.0.0/16"]) is True
|
|
assert _ip_in_any("10.1.2.3", ["10.0.0.0/8"]) is True
|
|
|
|
|
|
def test_ip_in_any_miss():
|
|
assert _ip_in_any("8.8.8.8", ["192.168.0.0/16", "10.0.0.0/8"]) is False
|
|
|
|
|
|
def test_ip_in_any_invalid_input():
|
|
assert _ip_in_any("not-an-ip", ["10.0.0.0/8"]) is False
|
|
|
|
|
|
def test_const_eq_basic():
|
|
assert _const_eq("abc", "abc") is True
|
|
assert _const_eq("abc", "abd") is False
|
|
assert _const_eq("abc", "abcd") is False
|
|
|
|
|
|
def test_require_app_missing_bearer(db_only):
|
|
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
|
|
with pytest.raises(HTTPException) as ei:
|
|
a.require_app(_FakeRequest("127.0.0.1"), None)
|
|
assert ei.value.status_code == 401
|
|
|
|
|
|
def test_require_app_bad_bearer(db_only):
|
|
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
|
|
with pytest.raises(HTTPException) as ei:
|
|
a.require_app(_FakeRequest("127.0.0.1"), "Bearer not-a-real-token")
|
|
assert ei.value.status_code == 403
|
|
|
|
|
|
def test_require_app_revoked_rejects(db_only):
|
|
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
|
|
db_only.insert_token(name="r", bearer="ct_r", is_admin=False, ip_cidrs=None)
|
|
db_only.revoke_token("r")
|
|
with pytest.raises(HTTPException) as ei:
|
|
a.require_app(_FakeRequest("127.0.0.1"), "Bearer ct_r")
|
|
assert ei.value.status_code == 403
|
|
|
|
|
|
def test_require_app_per_token_ip_allowlist(db_only):
|
|
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8", "10.0.0.0/8"])
|
|
db_only.insert_token(name="ip", bearer="ct_ip", is_admin=False, ip_cidrs=["10.0.0.0/8"])
|
|
# 10.x is in the allowlist
|
|
tok = a.require_app(_FakeRequest("10.0.0.5"), "Bearer ct_ip")
|
|
assert tok.name == "ip"
|
|
# Loopback is allowed because _ip_in_any short-circuits on is_loopback
|
|
tok2 = a.require_app(_FakeRequest("127.0.0.1"), "Bearer ct_ip")
|
|
assert tok2.name == "ip"
|
|
|
|
|
|
def test_require_admin_rejects_non_admin(db_only):
|
|
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
|
|
db_only.insert_token(name="u", bearer="ct_u", is_admin=False, ip_cidrs=None)
|
|
with pytest.raises(HTTPException) as ei:
|
|
a.require_admin(_FakeRequest("127.0.0.1"), "Bearer ct_u")
|
|
assert ei.value.status_code == 403
|
|
|
|
|
|
def test_require_admin_accepts_admin(db_only):
|
|
a = Auth(db=db_only, lan_cidrs=["127.0.0.0/8"])
|
|
db_only.insert_token(name="adm", bearer="ct_adm", is_admin=True, ip_cidrs=None)
|
|
tok = a.require_admin(_FakeRequest("127.0.0.1"), "Bearer ct_adm")
|
|
assert tok.is_admin is True
|
|
|
|
|
|
def test_global_ip_blocks_off_lan(db_only):
|
|
a = Auth(db=db_only, lan_cidrs=["192.168.0.0/16"])
|
|
with pytest.raises(HTTPException) as ei:
|
|
a.require_global_ip(_FakeRequest("8.8.8.8"))
|
|
assert ei.value.status_code == 403
|
|
|
|
|
|
def test_bootstrap_writes_admin_bearer(tmp_workspace):
|
|
"""First boot mints + writes the bearer file. Second boot reads existing."""
|
|
from crafting_table.db import DB
|
|
from crafting_table.auth import Auth
|
|
|
|
db = DB(str(tmp_workspace["db_path"]))
|
|
a = Auth(db=db, lan_cidrs=["127.0.0.0/8"])
|
|
bearer1 = a.bootstrap_admin(tmp_workspace["admin_bearer"])
|
|
assert tmp_workspace["admin_bearer"].exists()
|
|
assert tmp_workspace["admin_bearer"].stat().st_mode & 0o777 == 0o600
|
|
assert bearer1.startswith("ct_")
|
|
|
|
# Second call returns the same bearer (read off disk)
|
|
bearer2 = a.bootstrap_admin(tmp_workspace["admin_bearer"])
|
|
assert bearer2 == bearer1
|