From 1115079b26eff545f7ecfc789397071324a125b7 Mon Sep 17 00:00:00 2001 From: Shehab Yasser Date: Tue, 30 Jun 2026 13:23:50 +0300 Subject: [PATCH] fix(harbor): nested-run flags/dedup/resume, archive + secret guards, mkdtemp cleanup Mode B runner + build-compiler robustness (review findings on PR #5): - runner: emit --n-attempts / --max-retries (the typed HarborConfig fields were silently dropped); pick the best trial per task deterministically instead of last-write-wins over an unordered rglob; treat a persisted error sample as not-done so a transient failure is re-run on resume. - compiler: assert git archive exited 0 (and reap it) so a truncated stream cannot bake a near-empty baseline; validate declared secrets at build time and render compose secrets with a fail-fast guard so an unset host var fails loudly instead of producing a credential-less sidecar. - seed.sh: document that read_only_paths is advisory only and scorer provenance is enforced sidecar-side. - compiler: stage the dataset in a cleaned-up TemporaryDirectory (Greptile: the mkdtemp scratch dir was leaking, datasets can be gigabytes). Co-Authored-By: Claude Opus 4.8 (1M context) --- vero/src/vero/harbor/build/compiler.py | 69 +++++++++++---- .../build/templates/docker-compose.yaml.j2 | 4 +- .../vero/harbor/build/templates/seed.sh.j2 | 9 +- vero/src/vero/harbor/runner.py | 44 ++++++++-- vero/tests/test_harbor_build.py | 42 ++++++++- vero/tests/test_harbor_runner.py | 86 +++++++++++++++++++ 6 files changed, 225 insertions(+), 29 deletions(-) diff --git a/vero/src/vero/harbor/build/compiler.py b/vero/src/vero/harbor/build/compiler.py index 6151ca0..b667113 100644 --- a/vero/src/vero/harbor/build/compiler.py +++ b/vero/src/vero/harbor/build/compiler.py @@ -90,12 +90,25 @@ def _prepare_baseline_repo(agent_repo: Path, dest: Path) -> str: ["git", "-C", str(repo_root), "archive", "HEAD", str(rel)] if strip else ["git", "-C", str(repo_root), "archive", "HEAD"], stdout=subprocess.PIPE, + stderr=subprocess.PIPE, ) - subprocess.run( - ["tar", "xf", "-", "--strip-components", str(strip)], - cwd=dest, stdin=archive.stdout, check=True, - ) - archive.wait() + try: + subprocess.run( + ["tar", "xf", "-", "--strip-components", str(strip)], + cwd=dest, stdin=archive.stdout, check=True, + ) + finally: + # Let git see SIGPIPE if tar died, then reap it (no zombie). + if archive.stdout is not None: + archive.stdout.close() + archive_err = (archive.communicate()[1] or b"").decode(errors="replace") + # A failed `git archive` can emit a truncated stream that `tar` still + # accepts with exit 0, baking a near-empty baseline. Fail loudly instead. + if archive.returncode != 0: + raise RuntimeError( + f"git archive failed (exit {archive.returncode}) for {repo_root}: " + f"{archive_err.strip()}" + ) else: shutil.copytree(agent_repo, dest, dirs_exist_ok=True) @@ -205,27 +218,45 @@ def compile_task( import tempfile vh = env_dir / "sidecar" / "vero_home" - tmp = Path(tempfile.mkdtemp()) - if config.mode == "A": - if not config.dataset: - raise ValueError("Mode A requires a dataset.") - dataset_id = _register(config.dataset, vh, tmp) - else: - if not (config.partition and config.harbor): - raise ValueError("Mode B requires partition + harbor.") - if not (config.inner_task or config.harbor.get("task_source")): - raise ValueError("Mode B requires inner_task (local) or harbor.task_source (registry).") - from vero.harbor.dataset import build_harbor_dataset + # Stage the dataset in a scratch dir that is always cleaned up (datasets can be + # gigabytes; a leaked mkdtemp would accumulate across builds). _register copies + # the dataset into vh before the dir is torn down, so cleanup is safe. + with tempfile.TemporaryDirectory() as tmp_str: + tmp = Path(tmp_str) + if config.mode == "A": + if not config.dataset: + raise ValueError("Mode A requires a dataset.") + dataset_id = _register(config.dataset, vh, tmp) + else: + if not (config.partition and config.harbor): + raise ValueError("Mode B requires partition + harbor.") + if not (config.inner_task or config.harbor.get("task_source")): + raise ValueError("Mode B requires inner_task (local) or harbor.task_source (registry).") + from vero.harbor.dataset import build_harbor_dataset - dataset_id = _register(build_harbor_dataset(config.partition), vh, tmp) - if config.inner_task: # local benchmark -> bake sidecar-only - shutil.copytree(Path(config.inner_task).resolve(), env_dir / "sidecar" / "inner-task") + dataset_id = _register(build_harbor_dataset(config.partition), vh, tmp) + if config.inner_task: # local benchmark -> bake sidecar-only + shutil.copytree(Path(config.inner_task).resolve(), env_dir / "sidecar" / "inner-task") # 4. ServeConfig (compiler <-> serve contract) (env_dir / "sidecar" / "serve.json").write_text( json.dumps(_serve_config(config, dataset_id, base_commit), indent=2) ) + # 4b. Fail early if a declared secret is missing from the host env, so the + # operator finds out at build time rather than via a credential-less + # sidecar. The compose ${VAR:?} guard is the run-time backstop. + import os + + if not os.environ.get("VERO_SKIP_SECRET_CHECK"): + missing = [s for s in config.secrets if not os.environ.get(s)] + if missing: + raise ValueError( + "Declared secrets missing from the host environment: " + f"{', '.join(missing)}. Set them, or set VERO_SKIP_SECRET_CHECK=1 " + "to defer to the run-time compose check." + ) + # 5. render templates jenv = Environment( loader=FileSystemLoader(str(_TEMPLATES)), diff --git a/vero/src/vero/harbor/build/templates/docker-compose.yaml.j2 b/vero/src/vero/harbor/build/templates/docker-compose.yaml.j2 index 78f026c..9ec2b40 100644 --- a/vero/src/vero/harbor/build/templates/docker-compose.yaml.j2 +++ b/vero/src/vero/harbor/build/templates/docker-compose.yaml.j2 @@ -24,7 +24,9 @@ services: environment: VERO_HOME_DIR: "/opt/vero_home" {% for secret in secrets %} - {{ secret }}: "${{ '{' }}{{ secret }}{{ '}' }}" + {# Fail-fast: docker compose aborts if this host var is unset/empty, rather + than silently baking an empty credential into the sidecar. #} + {{ secret }}: "${{ '{' }}{{ secret }}:?{{ secret }} must be set in the host environment for the eval sidecar{{ '}' }}" {% endfor %} volumes: - agent_repo:/work/agent:ro diff --git a/vero/src/vero/harbor/build/templates/seed.sh.j2 b/vero/src/vero/harbor/build/templates/seed.sh.j2 index c284211..6e16ab2 100644 --- a/vero/src/vero/harbor/build/templates/seed.sh.j2 +++ b/vero/src/vero/harbor/build/templates/seed.sh.j2 @@ -10,8 +10,15 @@ fi # Whole repo is the optimizer's to edit... chown -R agent:agent /work/agent git config --system --add safe.directory /work/agent +# NOTE: read_only_paths is ADVISORY ONLY. The chmod/chown below locks paths in +# the live working tree (/work/agent), but the verifier checks out git blobs from +# the agent's commit, not this working tree, so an optimizer can still stage edits +# to a "locked" scorer path. Authoritative scorer-provenance is enforced +# sidecar-side: in Mode A the verifier scores with the sidecar-baked task project +# (task_project), never the scorer in the agent's commit. Treat this purely as a +# guardrail / footgun-reducer, not security. {% for p in read_only_paths %} -# ...except locked paths (e.g. the scorer): root-owned + unwritable. +# ...except locked paths (e.g. the scorer): root-owned + unwritable (advisory). if [ -e "/work/agent/{{ p }}" ]; then chown -R root:root "/work/agent/{{ p }}" chmod -R a-w "/work/agent/{{ p }}" diff --git a/vero/src/vero/harbor/runner.py b/vero/src/vero/harbor/runner.py index ca11a5d..8b63038 100644 --- a/vero/src/vero/harbor/runner.py +++ b/vero/src/vero/harbor/runner.py @@ -50,8 +50,10 @@ async def produce_sample_results( return jobs_dir = Path(result_dir) / "jobs" - # Resume: only run tasks without an already-persisted SampleResult. - pending = [(sid, t) for sid, t in pairs if self._existing(params, sid) is None] + # Resume: only run tasks not already completed successfully. A persisted + # *error* sample (transient harbor/verifier failure) is NOT done, so it is + # re-run rather than permanently skipped. + pending = [(sid, t) for sid, t in pairs if not self._is_done(params, sid)] if pending: await self._run_harbor( str(workspace.project_path), params, [t for _, t in pending], jobs_dir @@ -97,6 +99,8 @@ def _build_command( "--agent-import-path", c.agent_import_path, "-e", c.environment, "-n", str(params.max_concurrency), + "--n-attempts", str(c.n_attempts), + "--max-retries", str(c.max_retries), ] if c.model: cmd += ["-m", c.model] @@ -136,8 +140,8 @@ def _collate( ) -> None: trials = self._load_trials(jobs_dir) # {task_name: result_dict} for sample_id, task_name in pairs: - if self._existing(params, sample_id) is not None: - continue # already collated (resume) + if self._is_done(params, sample_id): + continue # already collated successfully (resume); errors are redone sample_result = self._sample_result( trials.get(task_name), sample_id, task_name, params ) @@ -155,17 +159,39 @@ def _load_trials(self, jobs_dir: Path) -> dict[str, dict]: return trials # Trial result.json files live at ///result.json; the # job-level //result.json carries no task_name, so recurse and - # key on task_name (skipping the job summary). + # key on task_name (skipping the job summary). A task may have several trials + # (retries / multiple attempts); rglob order is undefined, so keep the BEST + # trial per task deterministically rather than last-write-wins (a failing + # retry must never clobber a passing trial). + best_rank: dict[str, tuple] = {} for result_json in jobs_dir.rglob("result.json"): try: data = json.loads(result_json.read_text()) except (json.JSONDecodeError, OSError): continue task_name = data.get("task_name") - if task_name: + if not task_name: + continue + rank = self._trial_rank(data, result_json) + if task_name not in best_rank or rank > best_rank[task_name]: + best_rank[task_name] = rank trials[task_name] = data return trials + @staticmethod + def _trial_rank(data: dict, result_json: Path) -> tuple: + """Sort key for picking the best of several trials of one task. Higher wins: + prefer a clean trial with rewards, then any trial with rewards, then the most + recent attempt (finished_at, falling back to file mtime).""" + has_rewards = bool((data.get("verifier_result") or {}).get("rewards")) + clean = has_rewards and not data.get("exception_info") + finished_at = data.get("finished_at") or "" + try: + mtime = result_json.stat().st_mtime + except OSError: + mtime = 0.0 + return (clean, has_rewards, finished_at, mtime) + def _sample_result( self, trial: dict | None, @@ -218,3 +244,9 @@ def _existing(self, params: EvaluationParameters, sample_id: int) -> SampleResul params.result_id, sample_id, ) + + def _is_done(self, params: EvaluationParameters, sample_id: int) -> bool: + """A sample is done only if a persisted result exists AND is not an error; + a transiently-failed sample must be re-run on resume, not skipped.""" + existing = self._existing(params, sample_id) + return existing is not None and not existing.is_error() diff --git a/vero/tests/test_harbor_build.py b/vero/tests/test_harbor_build.py index 999589b..27416af 100644 --- a/vero/tests/test_harbor_build.py +++ b/vero/tests/test_harbor_build.py @@ -56,7 +56,8 @@ def _dataset(root: Path) -> Path: @pytest.fixture -def built(tmp_path): +def built(tmp_path, monkeypatch): + monkeypatch.setenv("VERO_SKIP_SECRET_CHECK", "1") config = BuildConfig( name="vero/gsm8k-opt", description="optimize gsm8k", @@ -109,7 +110,9 @@ def test_rendered_files_parse(built): assert "eval-sidecar" in compose["services"] assert compose["services"]["main"]["depends_on"]["eval-sidecar"]["condition"] == "service_healthy" # secret reaches the sidecar only, via host-resolved compose interpolation - assert compose["services"]["eval-sidecar"]["environment"]["OPENAI_API_KEY"] == "${OPENAI_API_KEY}" + # with a fail-fast guard (${VAR:?msg}) so an unset host var aborts the run. + sidecar_secret = compose["services"]["eval-sidecar"]["environment"]["OPENAI_API_KEY"] + assert sidecar_secret.startswith("${OPENAI_API_KEY:?") assert "OPENAI_API_KEY" not in compose["services"]["main"].get("environment", {}) @@ -129,3 +132,38 @@ def head(p): assert head("environment/agent-baseline") == head("environment/agent-seed") cfg = json.loads((built / "environment/sidecar/serve.json").read_text()) assert cfg["base_commit"] == head("environment/agent-baseline") + + +def test_baseline_archive_failure_raises(tmp_path, monkeypatch): + monkeypatch.setenv("VERO_SKIP_SECRET_CHECK", "1") + # An empty git repo with no commits: `git archive HEAD` exits nonzero. + bad = tmp_path / "emptyrepo" + (bad / "src").mkdir(parents=True) + (bad / "pyproject.toml").write_text("[project]\nname='x'\nversion='0'\n") + subprocess.run(["git", "init", "-q"], cwd=bad, check=True) + config = BuildConfig( + name="vero/x", agent_repo=str(bad), mode="A", task="gsm8k", + dataset=str(_dataset(tmp_path)), + splits=[{"split": "validation", "access": "non_viewable"}], + ) + with pytest.raises(RuntimeError, match="git archive failed"): + compile_task(config, tmp_path / "task", vero_root=_stub_vero(tmp_path)) + + +def test_missing_secret_fails_build(tmp_path, monkeypatch): + monkeypatch.delenv("VERO_SKIP_SECRET_CHECK", raising=False) + monkeypatch.delenv("DEFINITELY_MISSING_SECRET", raising=False) + config = BuildConfig( + name="vero/x", agent_repo=str(_agent_repo(tmp_path)), mode="A", task="gsm8k", + dataset=str(_dataset(tmp_path)), + splits=[{"split": "validation", "access": "non_viewable"}], + secrets=["DEFINITELY_MISSING_SECRET"], + ) + with pytest.raises(ValueError, match="missing from the host environment"): + compile_task(config, tmp_path / "task", vero_root=_stub_vero(tmp_path)) + + +def test_seed_documents_advisory_read_only(built): + seed = (built / "environment/main/seed.sh").read_text() + assert "ADVISORY ONLY" in seed + assert "sidecar-side" in seed diff --git a/vero/tests/test_harbor_runner.py b/vero/tests/test_harbor_runner.py index 15df89e..b81a80a 100644 --- a/vero/tests/test_harbor_runner.py +++ b/vero/tests/test_harbor_runner.py @@ -126,3 +126,89 @@ async def test_resume_only_runs_pending(self, tmp_path, monkeypatch): # only the pending task name was passed to harbor assert runner._run_harbor.await_args.args[2] == ["t1"] + + +class TestReviewFixes: + def test_emits_attempts_and_retries(self): + runner = HarborRunner( + HarborConfig( + task_source="org/ds@1", + agent_import_path="pkg.mod:Agent", + model="anthropic/x", + environment="modal", + n_attempts=3, + max_retries=5, + ) + ) + cmd = runner._build_command("/wt", _params(), ["t0"], Path("/jobs")) + assert "--n-attempts" in cmd and cmd[cmd.index("--n-attempts") + 1] == "3" + assert "--max-retries" in cmd and cmd[cmd.index("--max-retries") + 1] == "5" + + def test_passing_trial_wins_over_later_failing_retry(self, tmp_path): + runner = _runner() + jobs = tmp_path / "jobs" + run = jobs / "2026-01-01__00-00-00" + # passing trial, earlier finished_at + good = run / "trial0" + good.mkdir(parents=True) + (good / "result.json").write_text(json.dumps({ + "task_name": "t0", "trial_name": "trial0", "finished_at": "2026-01-01T00:01:00", + "verifier_result": {"rewards": {"pass": 1.0}}, + })) + # failing retry, later finished_at + exception_info; written second (newer mtime) + bad = run / "trial1" + bad.mkdir(parents=True) + (bad / "result.json").write_text(json.dumps({ + "task_name": "t0", "trial_name": "trial1", "finished_at": "2026-01-01T00:09:00", + "exception_info": {"exception_type": "RuntimeError", "exception_message": "boom", + "exception_traceback": ""}, + "verifier_result": None, + })) + trials = runner._load_trials(jobs) + assert trials["t0"]["trial_name"] == "trial0" + assert (trials["t0"]["verifier_result"] or {}).get("rewards") == {"pass": 1.0} + + def test_latest_attempt_wins_when_both_clean(self, tmp_path): + runner = _runner() + jobs = tmp_path / "jobs" + run = jobs / "2026-01-01__00-00-00" + early = run / "a" + early.mkdir(parents=True) + (early / "result.json").write_text(json.dumps({ + "task_name": "t0", "trial_name": "early", "finished_at": "2026-01-01T00:01:00", + "verifier_result": {"rewards": {"pass": 0.0}}, + })) + late = run / "b" + late.mkdir(parents=True) + (late / "result.json").write_text(json.dumps({ + "task_name": "t0", "trial_name": "late", "finished_at": "2026-01-01T00:05:00", + "verifier_result": {"rewards": {"pass": 1.0}}, + })) + trials = runner._load_trials(jobs) + assert trials["t0"]["trial_name"] == "late" + + @pytest.mark.asyncio + async def test_resume_reruns_persisted_error_sample(self, tmp_path, monkeypatch): + monkeypatch.setenv("VERO_HOME_DIR", str(tmp_path / "vh")) + runner = _runner() + params = _params() + result_dir = tmp_path / "result" + # sample 0 previously errored (transient failure) + save_sample_result( + get_vero_home_dir() / "sessions", "s", params.result_id, sample_id=0, + result=SampleResult( + dataset_sample=DatasetSample(sample_id=0, split="test", dataset_id="ds"), + error="transient harbor failure", commit="c1", result_id=params.result_id, + ), + ) + # a good trial for t0 now exists + _write_trial(result_dir / "jobs", "trial0", "t0", {"pass": 1.0}) + monkeypatch.setattr(runner, "_task_names_for", lambda p: [(0, "t0")]) + runner._run_harbor = AsyncMock() + ws = MagicMock(project_path="/wt") + await runner.produce_sample_results(workspace=ws, params=params, result_dir=result_dir) + # error sample was treated as pending (re-run) and re-collated to a score + assert runner._run_harbor.await_args.args[2] == ["t0"] + results = load_all_sample_results(get_vero_home_dir() / "sessions", "s", params.result_id) + assert results[0].error is None + assert results[0].score == 1.0