240 lines
9.8 KiB
Python
240 lines
9.8 KiB
Python
"""Workspace materialization — git clone + worktree + gc.
|
|
|
|
Layout (per project):
|
|
/workspace/<project>/.cache/ bare clone of the upstream
|
|
/workspace/<project>/<job_id>/ worktree for the requested branch+sha
|
|
|
|
Strategy:
|
|
- First time we see a project: bare clone --bare to .cache/.
|
|
- Subsequent jobs: `git fetch` the cache, then `git worktree add` the
|
|
requested branch into the per-job dir.
|
|
- After the job finishes: `git worktree remove` the per-job dir. Bare clone
|
|
stays put for the next run.
|
|
- Periodic gc: any worktree dir older than CRAFTING_GC_AGE seconds gets
|
|
pruned (defends against orphans from runner crashes).
|
|
|
|
Why bare + worktree (not fresh full clones): cargo/maven/gradle caches live
|
|
in /caches, but the source tree itself is fast to materialize this way and
|
|
leaves zero cross-job contamination. Fresh git clone of a 100MB repo takes
|
|
seconds; worktree-add is milliseconds.
|
|
|
|
Recipe commands run in the worktree dir (subproject path resolved against
|
|
the worktree root).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import shutil
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
|
|
log = logging.getLogger("crafting_table.workspace")
|
|
|
|
|
|
@dataclass
|
|
class WorkspacePaths:
|
|
project_root: Path
|
|
cache_dir: Path # .cache/ — bare clone
|
|
worktree_dir: Path # per-job worktree
|
|
|
|
|
|
class WorkspaceManager:
|
|
def __init__(self, root: Path):
|
|
self.root = Path(root)
|
|
self.root.mkdir(parents=True, exist_ok=True)
|
|
# Per-project lock around materialize. Without this, concurrent jobs
|
|
# from the same project race on the bare clone's `git fetch` — git
|
|
# refuses to fetch into a local branch that another worktree has
|
|
# checked out, AND the index file gets contended under parallel
|
|
# fetches even into remote-tracking refs. Lock is per-project (not
|
|
# global) so different projects materialize in parallel.
|
|
self._project_locks: dict[str, asyncio.Lock] = {}
|
|
self._project_locks_guard = asyncio.Lock()
|
|
|
|
async def _project_lock(self, project: str) -> asyncio.Lock:
|
|
async with self._project_locks_guard:
|
|
lock = self._project_locks.get(project)
|
|
if lock is None:
|
|
lock = asyncio.Lock()
|
|
self._project_locks[project] = lock
|
|
return lock
|
|
|
|
def paths_for(self, *, project: str, job_id: str) -> WorkspacePaths:
|
|
project_root = self.root / project
|
|
return WorkspacePaths(
|
|
project_root=project_root,
|
|
cache_dir=project_root / ".cache",
|
|
worktree_dir=project_root / job_id,
|
|
)
|
|
|
|
async def materialize(
|
|
self,
|
|
*,
|
|
project: str,
|
|
job_id: str,
|
|
git_url: str,
|
|
branch: str,
|
|
log_fh,
|
|
) -> WorkspacePaths:
|
|
"""Ensure the per-job worktree exists and is checked out at branch.
|
|
|
|
Writes git progress lines into log_fh. Raises CalledProcessError-like
|
|
exceptions through if a git step fails — runner.py catches and marks
|
|
the job failed.
|
|
"""
|
|
paths = self.paths_for(project=project, job_id=job_id)
|
|
paths.project_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Hold the per-project lock for the whole clone/fetch/worktree-add
|
|
# sequence. Releases when the worktree is on disk; the recipe runs
|
|
# outside the lock so concurrency=4 still stays parallel during the
|
|
# actual build/test.
|
|
lock = await self._project_lock(project)
|
|
async with lock:
|
|
if not paths.cache_dir.exists():
|
|
log_fh.write(f"[workspace] bare clone {git_url} -> {paths.cache_dir}\n")
|
|
log_fh.flush()
|
|
await _git(["clone", "--bare", git_url, str(paths.cache_dir)], log_fh, cwd=str(paths.project_root))
|
|
# Configure the bare to fetch into remote-tracking refs only —
|
|
# worktrees check out from origin/<branch> (detached HEAD)
|
|
# rather than from refs/heads/<branch>, so multiple worktrees
|
|
# can share the same upstream ref without conflict.
|
|
await _git(
|
|
["config", "remote.origin.fetch", "+refs/heads/*:refs/remotes/origin/*"],
|
|
log_fh, cwd=str(paths.cache_dir),
|
|
)
|
|
# `git clone --bare` populates refs/heads/* but NOT
|
|
# refs/remotes/origin/* — we need an explicit fetch to
|
|
# materialize the remote-tracking refs the worktree-add
|
|
# below will use.
|
|
await _git(
|
|
["fetch", "--prune", "origin", "+refs/heads/*:refs/remotes/origin/*"],
|
|
log_fh, cwd=str(paths.cache_dir),
|
|
)
|
|
else:
|
|
log_fh.write(f"[workspace] fetching latest into {paths.cache_dir}\n")
|
|
log_fh.flush()
|
|
# Fetch into remote-tracking refs (refs/remotes/origin/*),
|
|
# NOT into refs/heads/* — local branches can be held by other
|
|
# worktrees and git refuses to overwrite them. --prune drops
|
|
# branches deleted upstream so we don't keep stale tracking.
|
|
await _git(
|
|
["fetch", "--prune", "origin", "+refs/heads/*:refs/remotes/origin/*"],
|
|
log_fh, cwd=str(paths.cache_dir),
|
|
)
|
|
|
|
if paths.worktree_dir.exists():
|
|
# A previous run for the same job_id (replay or restart). Wipe it.
|
|
log_fh.write(f"[workspace] removing existing worktree {paths.worktree_dir}\n")
|
|
log_fh.flush()
|
|
await self._cleanup_worktree(paths)
|
|
|
|
log_fh.write(f"[workspace] worktree add {paths.worktree_dir} from origin/{branch}\n")
|
|
log_fh.flush()
|
|
# Detached worktree at origin/<branch>. Multiple worktrees can
|
|
# share the same remote-tracking ref without conflict.
|
|
await _git(
|
|
["worktree", "add", "--detach", str(paths.worktree_dir), f"origin/{branch}"],
|
|
log_fh,
|
|
cwd=str(paths.cache_dir),
|
|
)
|
|
return paths
|
|
|
|
async def cleanup(self, paths: WorkspacePaths) -> None:
|
|
"""Remove a worktree post-job. Best-effort — failures logged, not raised."""
|
|
try:
|
|
await self._cleanup_worktree(paths)
|
|
except Exception as e:
|
|
log.warning("worktree cleanup failed for %s: %s", paths.worktree_dir, e)
|
|
|
|
async def _cleanup_worktree(self, paths: WorkspacePaths) -> None:
|
|
if paths.worktree_dir.exists() and paths.cache_dir.exists():
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git", "worktree", "remove", "--force", str(paths.worktree_dir),
|
|
cwd=str(paths.cache_dir),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
await proc.wait()
|
|
except Exception:
|
|
pass
|
|
# Fallback: rmtree if the worktree dir is still around.
|
|
if paths.worktree_dir.exists():
|
|
shutil.rmtree(paths.worktree_dir, ignore_errors=True)
|
|
|
|
async def gc(self, *, age_secs: int) -> dict:
|
|
"""Sweep worktrees older than age_secs. Returns counters."""
|
|
cutoff = time.time() - age_secs
|
|
removed = 0
|
|
scanned = 0
|
|
for project_dir in self.root.iterdir():
|
|
if not project_dir.is_dir():
|
|
continue
|
|
cache_dir = project_dir / ".cache"
|
|
for child in project_dir.iterdir():
|
|
scanned += 1
|
|
if child.name == ".cache":
|
|
continue
|
|
if not child.is_dir():
|
|
continue
|
|
try:
|
|
mtime = child.stat().st_mtime
|
|
except OSError:
|
|
continue
|
|
if mtime > cutoff:
|
|
continue
|
|
# Old worktree — prune.
|
|
if cache_dir.exists():
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git", "worktree", "remove", "--force", str(child),
|
|
cwd=str(cache_dir),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
await proc.wait()
|
|
except Exception:
|
|
pass
|
|
shutil.rmtree(child, ignore_errors=True)
|
|
removed += 1
|
|
|
|
# Periodic `git gc` on the bare clone if it's been quiet for >7d
|
|
if cache_dir.exists():
|
|
try:
|
|
cache_mtime = cache_dir.stat().st_mtime
|
|
if time.time() - cache_mtime > 7 * 86400:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git", "gc", "--prune=now", "--quiet",
|
|
cwd=str(cache_dir),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
await proc.wait()
|
|
except Exception:
|
|
pass
|
|
|
|
return {"scanned": scanned, "removed": removed}
|
|
|
|
|
|
async def _git(args: list[str], log_fh, *, cwd: str | None = None) -> None:
|
|
"""Run `git <args>` and stream stdout+stderr to log_fh."""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git", *args,
|
|
cwd=cwd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
assert proc.stdout is not None
|
|
while True:
|
|
line = await proc.stdout.readline()
|
|
if not line:
|
|
break
|
|
log_fh.write(line.decode("utf-8", "replace"))
|
|
log_fh.flush()
|
|
rc = await proc.wait()
|
|
if rc != 0:
|
|
raise RuntimeError(f"git {args[0]} exited {rc}")
|