crafting-table/crafting_table/workspace.py

240 lines
9.8 KiB
Python

"""Workspace materialization — git clone + worktree + gc.
Layout (per project):
/workspace/<project>/.cache/ bare clone of the upstream
/workspace/<project>/<job_id>/ worktree for the requested branch+sha
Strategy:
- First time we see a project: bare clone --bare to .cache/.
- Subsequent jobs: `git fetch` the cache, then `git worktree add` the
requested branch into the per-job dir.
- After the job finishes: `git worktree remove` the per-job dir. Bare clone
stays put for the next run.
- Periodic gc: any worktree dir older than CRAFTING_GC_AGE seconds gets
pruned (defends against orphans from runner crashes).
Why bare + worktree (not fresh full clones): cargo/maven/gradle caches live
in /caches, but the source tree itself is fast to materialize this way and
leaves zero cross-job contamination. Fresh git clone of a 100MB repo takes
seconds; worktree-add is milliseconds.
Recipe commands run in the worktree dir (subproject path resolved against
the worktree root).
"""
from __future__ import annotations
import asyncio
import logging
import shutil
import time
from dataclasses import dataclass
from pathlib import Path
log = logging.getLogger("crafting_table.workspace")
@dataclass
class WorkspacePaths:
project_root: Path
cache_dir: Path # .cache/ — bare clone
worktree_dir: Path # per-job worktree
class WorkspaceManager:
def __init__(self, root: Path):
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
# Per-project lock around materialize. Without this, concurrent jobs
# from the same project race on the bare clone's `git fetch` — git
# refuses to fetch into a local branch that another worktree has
# checked out, AND the index file gets contended under parallel
# fetches even into remote-tracking refs. Lock is per-project (not
# global) so different projects materialize in parallel.
self._project_locks: dict[str, asyncio.Lock] = {}
self._project_locks_guard = asyncio.Lock()
async def _project_lock(self, project: str) -> asyncio.Lock:
async with self._project_locks_guard:
lock = self._project_locks.get(project)
if lock is None:
lock = asyncio.Lock()
self._project_locks[project] = lock
return lock
def paths_for(self, *, project: str, job_id: str) -> WorkspacePaths:
project_root = self.root / project
return WorkspacePaths(
project_root=project_root,
cache_dir=project_root / ".cache",
worktree_dir=project_root / job_id,
)
async def materialize(
self,
*,
project: str,
job_id: str,
git_url: str,
branch: str,
log_fh,
) -> WorkspacePaths:
"""Ensure the per-job worktree exists and is checked out at branch.
Writes git progress lines into log_fh. Raises CalledProcessError-like
exceptions through if a git step fails — runner.py catches and marks
the job failed.
"""
paths = self.paths_for(project=project, job_id=job_id)
paths.project_root.mkdir(parents=True, exist_ok=True)
# Hold the per-project lock for the whole clone/fetch/worktree-add
# sequence. Releases when the worktree is on disk; the recipe runs
# outside the lock so concurrency=4 still stays parallel during the
# actual build/test.
lock = await self._project_lock(project)
async with lock:
if not paths.cache_dir.exists():
log_fh.write(f"[workspace] bare clone {git_url} -> {paths.cache_dir}\n")
log_fh.flush()
await _git(["clone", "--bare", git_url, str(paths.cache_dir)], log_fh, cwd=str(paths.project_root))
# Configure the bare to fetch into remote-tracking refs only —
# worktrees check out from origin/<branch> (detached HEAD)
# rather than from refs/heads/<branch>, so multiple worktrees
# can share the same upstream ref without conflict.
await _git(
["config", "remote.origin.fetch", "+refs/heads/*:refs/remotes/origin/*"],
log_fh, cwd=str(paths.cache_dir),
)
# `git clone --bare` populates refs/heads/* but NOT
# refs/remotes/origin/* — we need an explicit fetch to
# materialize the remote-tracking refs the worktree-add
# below will use.
await _git(
["fetch", "--prune", "origin", "+refs/heads/*:refs/remotes/origin/*"],
log_fh, cwd=str(paths.cache_dir),
)
else:
log_fh.write(f"[workspace] fetching latest into {paths.cache_dir}\n")
log_fh.flush()
# Fetch into remote-tracking refs (refs/remotes/origin/*),
# NOT into refs/heads/* — local branches can be held by other
# worktrees and git refuses to overwrite them. --prune drops
# branches deleted upstream so we don't keep stale tracking.
await _git(
["fetch", "--prune", "origin", "+refs/heads/*:refs/remotes/origin/*"],
log_fh, cwd=str(paths.cache_dir),
)
if paths.worktree_dir.exists():
# A previous run for the same job_id (replay or restart). Wipe it.
log_fh.write(f"[workspace] removing existing worktree {paths.worktree_dir}\n")
log_fh.flush()
await self._cleanup_worktree(paths)
log_fh.write(f"[workspace] worktree add {paths.worktree_dir} from origin/{branch}\n")
log_fh.flush()
# Detached worktree at origin/<branch>. Multiple worktrees can
# share the same remote-tracking ref without conflict.
await _git(
["worktree", "add", "--detach", str(paths.worktree_dir), f"origin/{branch}"],
log_fh,
cwd=str(paths.cache_dir),
)
return paths
async def cleanup(self, paths: WorkspacePaths) -> None:
"""Remove a worktree post-job. Best-effort — failures logged, not raised."""
try:
await self._cleanup_worktree(paths)
except Exception as e:
log.warning("worktree cleanup failed for %s: %s", paths.worktree_dir, e)
async def _cleanup_worktree(self, paths: WorkspacePaths) -> None:
if paths.worktree_dir.exists() and paths.cache_dir.exists():
try:
proc = await asyncio.create_subprocess_exec(
"git", "worktree", "remove", "--force", str(paths.worktree_dir),
cwd=str(paths.cache_dir),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
await proc.wait()
except Exception:
pass
# Fallback: rmtree if the worktree dir is still around.
if paths.worktree_dir.exists():
shutil.rmtree(paths.worktree_dir, ignore_errors=True)
async def gc(self, *, age_secs: int) -> dict:
"""Sweep worktrees older than age_secs. Returns counters."""
cutoff = time.time() - age_secs
removed = 0
scanned = 0
for project_dir in self.root.iterdir():
if not project_dir.is_dir():
continue
cache_dir = project_dir / ".cache"
for child in project_dir.iterdir():
scanned += 1
if child.name == ".cache":
continue
if not child.is_dir():
continue
try:
mtime = child.stat().st_mtime
except OSError:
continue
if mtime > cutoff:
continue
# Old worktree — prune.
if cache_dir.exists():
try:
proc = await asyncio.create_subprocess_exec(
"git", "worktree", "remove", "--force", str(child),
cwd=str(cache_dir),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
await proc.wait()
except Exception:
pass
shutil.rmtree(child, ignore_errors=True)
removed += 1
# Periodic `git gc` on the bare clone if it's been quiet for >7d
if cache_dir.exists():
try:
cache_mtime = cache_dir.stat().st_mtime
if time.time() - cache_mtime > 7 * 86400:
proc = await asyncio.create_subprocess_exec(
"git", "gc", "--prune=now", "--quiet",
cwd=str(cache_dir),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
await proc.wait()
except Exception:
pass
return {"scanned": scanned, "removed": removed}
async def _git(args: list[str], log_fh, *, cwd: str | None = None) -> None:
"""Run `git <args>` and stream stdout+stderr to log_fh."""
proc = await asyncio.create_subprocess_exec(
"git", *args,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
assert proc.stdout is not None
while True:
line = await proc.stdout.readline()
if not line:
break
log_fh.write(line.decode("utf-8", "replace"))
log_fh.flush()
rc = await proc.wait()
if rc != 0:
raise RuntimeError(f"git {args[0]} exited {rc}")