"""Workspace materialization — git clone + worktree + gc. Layout (per project): /workspace//.cache/ bare clone of the upstream /workspace/// worktree for the requested branch+sha Strategy: - First time we see a project: bare clone --bare to .cache/. - Subsequent jobs: `git fetch` the cache, then `git worktree add` the requested branch into the per-job dir. - After the job finishes: `git worktree remove` the per-job dir. Bare clone stays put for the next run. - Periodic gc: any worktree dir older than CRAFTING_GC_AGE seconds gets pruned (defends against orphans from runner crashes). Why bare + worktree (not fresh full clones): cargo/maven/gradle caches live in /caches, but the source tree itself is fast to materialize this way and leaves zero cross-job contamination. Fresh git clone of a 100MB repo takes seconds; worktree-add is milliseconds. Recipe commands run in the worktree dir (subproject path resolved against the worktree root). """ from __future__ import annotations import asyncio import logging import shutil import time from dataclasses import dataclass from pathlib import Path log = logging.getLogger("crafting_table.workspace") @dataclass class WorkspacePaths: project_root: Path cache_dir: Path # .cache/ — bare clone worktree_dir: Path # per-job worktree class WorkspaceManager: def __init__(self, root: Path): self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) # Per-project lock around materialize. Without this, concurrent jobs # from the same project race on the bare clone's `git fetch` — git # refuses to fetch into a local branch that another worktree has # checked out, AND the index file gets contended under parallel # fetches even into remote-tracking refs. Lock is per-project (not # global) so different projects materialize in parallel. self._project_locks: dict[str, asyncio.Lock] = {} self._project_locks_guard = asyncio.Lock() async def _project_lock(self, project: str) -> asyncio.Lock: async with self._project_locks_guard: lock = self._project_locks.get(project) if lock is None: lock = asyncio.Lock() self._project_locks[project] = lock return lock def paths_for(self, *, project: str, job_id: str) -> WorkspacePaths: project_root = self.root / project return WorkspacePaths( project_root=project_root, cache_dir=project_root / ".cache", worktree_dir=project_root / job_id, ) async def materialize( self, *, project: str, job_id: str, git_url: str, branch: str, log_fh, ) -> WorkspacePaths: """Ensure the per-job worktree exists and is checked out at branch. Writes git progress lines into log_fh. Raises CalledProcessError-like exceptions through if a git step fails — runner.py catches and marks the job failed. """ paths = self.paths_for(project=project, job_id=job_id) paths.project_root.mkdir(parents=True, exist_ok=True) # Hold the per-project lock for the whole clone/fetch/worktree-add # sequence. Releases when the worktree is on disk; the recipe runs # outside the lock so concurrency=4 still stays parallel during the # actual build/test. lock = await self._project_lock(project) async with lock: if not paths.cache_dir.exists(): log_fh.write(f"[workspace] bare clone {git_url} -> {paths.cache_dir}\n") log_fh.flush() await _git(["clone", "--bare", git_url, str(paths.cache_dir)], log_fh, cwd=str(paths.project_root)) # Configure the bare to fetch into remote-tracking refs only — # worktrees check out from origin/ (detached HEAD) # rather than from refs/heads/, so multiple worktrees # can share the same upstream ref without conflict. await _git( ["config", "remote.origin.fetch", "+refs/heads/*:refs/remotes/origin/*"], log_fh, cwd=str(paths.cache_dir), ) # `git clone --bare` populates refs/heads/* but NOT # refs/remotes/origin/* — we need an explicit fetch to # materialize the remote-tracking refs the worktree-add # below will use. await _git( ["fetch", "--prune", "origin", "+refs/heads/*:refs/remotes/origin/*"], log_fh, cwd=str(paths.cache_dir), ) else: log_fh.write(f"[workspace] fetching latest into {paths.cache_dir}\n") log_fh.flush() # Fetch into remote-tracking refs (refs/remotes/origin/*), # NOT into refs/heads/* — local branches can be held by other # worktrees and git refuses to overwrite them. --prune drops # branches deleted upstream so we don't keep stale tracking. await _git( ["fetch", "--prune", "origin", "+refs/heads/*:refs/remotes/origin/*"], log_fh, cwd=str(paths.cache_dir), ) if paths.worktree_dir.exists(): # A previous run for the same job_id (replay or restart). Wipe it. log_fh.write(f"[workspace] removing existing worktree {paths.worktree_dir}\n") log_fh.flush() await self._cleanup_worktree(paths) log_fh.write(f"[workspace] worktree add {paths.worktree_dir} from origin/{branch}\n") log_fh.flush() # Detached worktree at origin/. Multiple worktrees can # share the same remote-tracking ref without conflict. await _git( ["worktree", "add", "--detach", str(paths.worktree_dir), f"origin/{branch}"], log_fh, cwd=str(paths.cache_dir), ) return paths async def cleanup(self, paths: WorkspacePaths) -> None: """Remove a worktree post-job. Best-effort — failures logged, not raised.""" try: await self._cleanup_worktree(paths) except Exception as e: log.warning("worktree cleanup failed for %s: %s", paths.worktree_dir, e) async def _cleanup_worktree(self, paths: WorkspacePaths) -> None: if paths.worktree_dir.exists() and paths.cache_dir.exists(): try: proc = await asyncio.create_subprocess_exec( "git", "worktree", "remove", "--force", str(paths.worktree_dir), cwd=str(paths.cache_dir), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) await proc.wait() except Exception: pass # Fallback: rmtree if the worktree dir is still around. if paths.worktree_dir.exists(): shutil.rmtree(paths.worktree_dir, ignore_errors=True) async def gc(self, *, age_secs: int) -> dict: """Sweep worktrees older than age_secs. Returns counters.""" cutoff = time.time() - age_secs removed = 0 scanned = 0 for project_dir in self.root.iterdir(): if not project_dir.is_dir(): continue cache_dir = project_dir / ".cache" for child in project_dir.iterdir(): scanned += 1 if child.name == ".cache": continue if not child.is_dir(): continue try: mtime = child.stat().st_mtime except OSError: continue if mtime > cutoff: continue # Old worktree — prune. if cache_dir.exists(): try: proc = await asyncio.create_subprocess_exec( "git", "worktree", "remove", "--force", str(child), cwd=str(cache_dir), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) await proc.wait() except Exception: pass shutil.rmtree(child, ignore_errors=True) removed += 1 # Periodic `git gc` on the bare clone if it's been quiet for >7d if cache_dir.exists(): try: cache_mtime = cache_dir.stat().st_mtime if time.time() - cache_mtime > 7 * 86400: proc = await asyncio.create_subprocess_exec( "git", "gc", "--prune=now", "--quiet", cwd=str(cache_dir), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) await proc.wait() except Exception: pass return {"scanned": scanned, "removed": removed} async def _git(args: list[str], log_fh, *, cwd: str | None = None) -> None: """Run `git ` and stream stdout+stderr to log_fh.""" proc = await asyncio.create_subprocess_exec( "git", *args, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) assert proc.stdout is not None while True: line = await proc.stdout.readline() if not line: break log_fh.write(line.decode("utf-8", "replace")) log_fh.flush() rc = await proc.wait() if rc != 0: raise RuntimeError(f"git {args[0]} exited {rc}")