workspace: per-project lock + fetch into remote-tracking refs
Two bugs caught by the first real concurrent dogfood (15 SDK build jobs
queued at once for clawdforge):
1. Concurrent fetch race — multiple jobs from same project all called
`git fetch +refs/heads/*:refs/heads/*` simultaneously. Git refuses
to fetch into a local branch ref that another worktree has checked
out, so jobs after the first failed fast with:
fatal: refusing to fetch into branch 'refs/heads/main' checked
out at '/workspace/clawdforge/<other-job-id>'
2. Worktree-from-local-branch — `worktree add ... main` reserved the
local branch ref to the worktree, blocking subsequent fetches even
when the lock above wasn't held.
Fix:
- Per-project asyncio.Lock around the materialize() body (clone +
fetch + worktree-add). Different projects still parallelize; same
project serializes through the cache dir.
- Fetch into remote-tracking refs only:
+refs/heads/*:refs/remotes/origin/*
Local refs/heads/* are never written, so no worktree can hold them.
- worktree add uses `--detach origin/<branch>` so each worktree is at
the remote-tracking ref. Multiple detached worktrees share the same
remote ref without conflict.
The recipe itself runs outside the lock, so concurrency=4 still
parallelizes the actual build/test work — only the brief
materialize step (clone or fetch + worktree create) serializes.
Tests still 6/6 green on test_runner.py (uses _StubWorkspace, not the
real WorkspaceManager, so it's unaffected — the real exercise is the
live re-queue against fixed code).
This commit is contained in:
parent
61a9814b67
commit
129d630391
1 changed files with 59 additions and 22 deletions
|
|
@ -45,6 +45,22 @@ class WorkspaceManager:
|
|||
def __init__(self, root: Path):
|
||||
self.root = Path(root)
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
# Per-project lock around materialize. Without this, concurrent jobs
|
||||
# from the same project race on the bare clone's `git fetch` — git
|
||||
# refuses to fetch into a local branch that another worktree has
|
||||
# checked out, AND the index file gets contended under parallel
|
||||
# fetches even into remote-tracking refs. Lock is per-project (not
|
||||
# global) so different projects materialize in parallel.
|
||||
self._project_locks: dict[str, asyncio.Lock] = {}
|
||||
self._project_locks_guard = asyncio.Lock()
|
||||
|
||||
async def _project_lock(self, project: str) -> asyncio.Lock:
|
||||
async with self._project_locks_guard:
|
||||
lock = self._project_locks.get(project)
|
||||
if lock is None:
|
||||
lock = asyncio.Lock()
|
||||
self._project_locks[project] = lock
|
||||
return lock
|
||||
|
||||
def paths_for(self, *, project: str, job_id: str) -> WorkspacePaths:
|
||||
project_root = self.root / project
|
||||
|
|
@ -72,30 +88,51 @@ class WorkspaceManager:
|
|||
paths = self.paths_for(project=project, job_id=job_id)
|
||||
paths.project_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if not paths.cache_dir.exists():
|
||||
log_fh.write(f"[workspace] bare clone {git_url} -> {paths.cache_dir}\n")
|
||||
log_fh.flush()
|
||||
await _git(["clone", "--bare", git_url, str(paths.cache_dir)], log_fh, cwd=str(paths.project_root))
|
||||
else:
|
||||
log_fh.write(f"[workspace] fetching latest into {paths.cache_dir}\n")
|
||||
log_fh.flush()
|
||||
# --prune drops branches deleted upstream so worktree-add doesn't
|
||||
# silently land on a stale ref.
|
||||
await _git(["fetch", "--prune", "origin", "+refs/heads/*:refs/heads/*"], log_fh, cwd=str(paths.cache_dir))
|
||||
# Hold the per-project lock for the whole clone/fetch/worktree-add
|
||||
# sequence. Releases when the worktree is on disk; the recipe runs
|
||||
# outside the lock so concurrency=4 still stays parallel during the
|
||||
# actual build/test.
|
||||
lock = await self._project_lock(project)
|
||||
async with lock:
|
||||
if not paths.cache_dir.exists():
|
||||
log_fh.write(f"[workspace] bare clone {git_url} -> {paths.cache_dir}\n")
|
||||
log_fh.flush()
|
||||
await _git(["clone", "--bare", git_url, str(paths.cache_dir)], log_fh, cwd=str(paths.project_root))
|
||||
# Configure the bare to fetch into remote-tracking refs only —
|
||||
# worktrees check out from origin/<branch> (detached HEAD)
|
||||
# rather than from refs/heads/<branch>, so multiple worktrees
|
||||
# can share the same upstream ref without conflict.
|
||||
await _git(
|
||||
["config", "remote.origin.fetch", "+refs/heads/*:refs/remotes/origin/*"],
|
||||
log_fh, cwd=str(paths.cache_dir),
|
||||
)
|
||||
else:
|
||||
log_fh.write(f"[workspace] fetching latest into {paths.cache_dir}\n")
|
||||
log_fh.flush()
|
||||
# Fetch into remote-tracking refs (refs/remotes/origin/*),
|
||||
# NOT into refs/heads/* — local branches can be held by other
|
||||
# worktrees and git refuses to overwrite them. --prune drops
|
||||
# branches deleted upstream so we don't keep stale tracking.
|
||||
await _git(
|
||||
["fetch", "--prune", "origin", "+refs/heads/*:refs/remotes/origin/*"],
|
||||
log_fh, cwd=str(paths.cache_dir),
|
||||
)
|
||||
|
||||
if paths.worktree_dir.exists():
|
||||
# A previous run for the same job_id (replay or restart). Wipe it.
|
||||
log_fh.write(f"[workspace] removing existing worktree {paths.worktree_dir}\n")
|
||||
log_fh.flush()
|
||||
await self._cleanup_worktree(paths)
|
||||
if paths.worktree_dir.exists():
|
||||
# A previous run for the same job_id (replay or restart). Wipe it.
|
||||
log_fh.write(f"[workspace] removing existing worktree {paths.worktree_dir}\n")
|
||||
log_fh.flush()
|
||||
await self._cleanup_worktree(paths)
|
||||
|
||||
log_fh.write(f"[workspace] worktree add {paths.worktree_dir} branch={branch}\n")
|
||||
log_fh.flush()
|
||||
await _git(
|
||||
["worktree", "add", "--force", str(paths.worktree_dir), branch],
|
||||
log_fh,
|
||||
cwd=str(paths.cache_dir),
|
||||
)
|
||||
log_fh.write(f"[workspace] worktree add {paths.worktree_dir} from origin/{branch}\n")
|
||||
log_fh.flush()
|
||||
# Detached worktree at origin/<branch>. Multiple worktrees can
|
||||
# share the same remote-tracking ref without conflict.
|
||||
await _git(
|
||||
["worktree", "add", "--detach", str(paths.worktree_dir), f"origin/{branch}"],
|
||||
log_fh,
|
||||
cwd=str(paths.cache_dir),
|
||||
)
|
||||
return paths
|
||||
|
||||
async def cleanup(self, paths: WorkspacePaths) -> None:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue