From 129d63039169ea5240034c55dd4bb633bb844066 Mon Sep 17 00:00:00 2001 From: Kayos Date: Wed, 29 Apr 2026 13:56:44 -0700 Subject: [PATCH] workspace: per-project lock + fetch into remote-tracking refs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs caught by the first real concurrent dogfood (15 SDK build jobs queued at once for clawdforge): 1. Concurrent fetch race — multiple jobs from same project all called `git fetch +refs/heads/*:refs/heads/*` simultaneously. Git refuses to fetch into a local branch ref that another worktree has checked out, so jobs after the first failed fast with: fatal: refusing to fetch into branch 'refs/heads/main' checked out at '/workspace/clawdforge/' 2. Worktree-from-local-branch — `worktree add ... main` reserved the local branch ref to the worktree, blocking subsequent fetches even when the lock above wasn't held. Fix: - Per-project asyncio.Lock around the materialize() body (clone + fetch + worktree-add). Different projects still parallelize; same project serializes through the cache dir. - Fetch into remote-tracking refs only: +refs/heads/*:refs/remotes/origin/* Local refs/heads/* are never written, so no worktree can hold them. - worktree add uses `--detach origin/` so each worktree is at the remote-tracking ref. Multiple detached worktrees share the same remote ref without conflict. The recipe itself runs outside the lock, so concurrency=4 still parallelizes the actual build/test work — only the brief materialize step (clone or fetch + worktree create) serializes. Tests still 6/6 green on test_runner.py (uses _StubWorkspace, not the real WorkspaceManager, so it's unaffected — the real exercise is the live re-queue against fixed code). --- crafting_table/workspace.py | 81 +++++++++++++++++++++++++++---------- 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/crafting_table/workspace.py b/crafting_table/workspace.py index 8912143..2e310f8 100644 --- a/crafting_table/workspace.py +++ b/crafting_table/workspace.py @@ -45,6 +45,22 @@ class WorkspaceManager: def __init__(self, root: Path): self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) + # Per-project lock around materialize. Without this, concurrent jobs + # from the same project race on the bare clone's `git fetch` — git + # refuses to fetch into a local branch that another worktree has + # checked out, AND the index file gets contended under parallel + # fetches even into remote-tracking refs. Lock is per-project (not + # global) so different projects materialize in parallel. + self._project_locks: dict[str, asyncio.Lock] = {} + self._project_locks_guard = asyncio.Lock() + + async def _project_lock(self, project: str) -> asyncio.Lock: + async with self._project_locks_guard: + lock = self._project_locks.get(project) + if lock is None: + lock = asyncio.Lock() + self._project_locks[project] = lock + return lock def paths_for(self, *, project: str, job_id: str) -> WorkspacePaths: project_root = self.root / project @@ -72,30 +88,51 @@ class WorkspaceManager: paths = self.paths_for(project=project, job_id=job_id) paths.project_root.mkdir(parents=True, exist_ok=True) - if not paths.cache_dir.exists(): - log_fh.write(f"[workspace] bare clone {git_url} -> {paths.cache_dir}\n") - log_fh.flush() - await _git(["clone", "--bare", git_url, str(paths.cache_dir)], log_fh, cwd=str(paths.project_root)) - else: - log_fh.write(f"[workspace] fetching latest into {paths.cache_dir}\n") - log_fh.flush() - # --prune drops branches deleted upstream so worktree-add doesn't - # silently land on a stale ref. - await _git(["fetch", "--prune", "origin", "+refs/heads/*:refs/heads/*"], log_fh, cwd=str(paths.cache_dir)) + # Hold the per-project lock for the whole clone/fetch/worktree-add + # sequence. Releases when the worktree is on disk; the recipe runs + # outside the lock so concurrency=4 still stays parallel during the + # actual build/test. + lock = await self._project_lock(project) + async with lock: + if not paths.cache_dir.exists(): + log_fh.write(f"[workspace] bare clone {git_url} -> {paths.cache_dir}\n") + log_fh.flush() + await _git(["clone", "--bare", git_url, str(paths.cache_dir)], log_fh, cwd=str(paths.project_root)) + # Configure the bare to fetch into remote-tracking refs only — + # worktrees check out from origin/ (detached HEAD) + # rather than from refs/heads/, so multiple worktrees + # can share the same upstream ref without conflict. + await _git( + ["config", "remote.origin.fetch", "+refs/heads/*:refs/remotes/origin/*"], + log_fh, cwd=str(paths.cache_dir), + ) + else: + log_fh.write(f"[workspace] fetching latest into {paths.cache_dir}\n") + log_fh.flush() + # Fetch into remote-tracking refs (refs/remotes/origin/*), + # NOT into refs/heads/* — local branches can be held by other + # worktrees and git refuses to overwrite them. --prune drops + # branches deleted upstream so we don't keep stale tracking. + await _git( + ["fetch", "--prune", "origin", "+refs/heads/*:refs/remotes/origin/*"], + log_fh, cwd=str(paths.cache_dir), + ) - if paths.worktree_dir.exists(): - # A previous run for the same job_id (replay or restart). Wipe it. - log_fh.write(f"[workspace] removing existing worktree {paths.worktree_dir}\n") - log_fh.flush() - await self._cleanup_worktree(paths) + if paths.worktree_dir.exists(): + # A previous run for the same job_id (replay or restart). Wipe it. + log_fh.write(f"[workspace] removing existing worktree {paths.worktree_dir}\n") + log_fh.flush() + await self._cleanup_worktree(paths) - log_fh.write(f"[workspace] worktree add {paths.worktree_dir} branch={branch}\n") - log_fh.flush() - await _git( - ["worktree", "add", "--force", str(paths.worktree_dir), branch], - log_fh, - cwd=str(paths.cache_dir), - ) + log_fh.write(f"[workspace] worktree add {paths.worktree_dir} from origin/{branch}\n") + log_fh.flush() + # Detached worktree at origin/. Multiple worktrees can + # share the same remote-tracking ref without conflict. + await _git( + ["worktree", "add", "--detach", str(paths.worktree_dir), f"origin/{branch}"], + log_fh, + cwd=str(paths.cache_dir), + ) return paths async def cleanup(self, paths: WorkspacePaths) -> None: