Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions vero/src/vero/harbor/build/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,25 @@ def _prepare_baseline_repo(agent_repo: Path, dest: Path) -> str:
["git", "-C", str(repo_root), "archive", "HEAD", str(rel)]
if strip else ["git", "-C", str(repo_root), "archive", "HEAD"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
subprocess.run(
["tar", "xf", "-", "--strip-components", str(strip)],
cwd=dest, stdin=archive.stdout, check=True,
)
archive.wait()
try:
subprocess.run(
["tar", "xf", "-", "--strip-components", str(strip)],
cwd=dest, stdin=archive.stdout, check=True,
)
finally:
# Let git see SIGPIPE if tar died, then reap it (no zombie).
if archive.stdout is not None:
archive.stdout.close()
archive_err = (archive.communicate()[1] or b"").decode(errors="replace")
# A failed `git archive` can emit a truncated stream that `tar` still
# accepts with exit 0, baking a near-empty baseline. Fail loudly instead.
if archive.returncode != 0:
raise RuntimeError(
f"git archive failed (exit {archive.returncode}) for {repo_root}: "
f"{archive_err.strip()}"
)
else:
shutil.copytree(agent_repo, dest, dirs_exist_ok=True)

Expand Down Expand Up @@ -205,27 +218,45 @@ def compile_task(
import tempfile

vh = env_dir / "sidecar" / "vero_home"
tmp = Path(tempfile.mkdtemp())
if config.mode == "A":
if not config.dataset:
raise ValueError("Mode A requires a dataset.")
dataset_id = _register(config.dataset, vh, tmp)
else:
if not (config.partition and config.harbor):
raise ValueError("Mode B requires partition + harbor.")
if not (config.inner_task or config.harbor.get("task_source")):
raise ValueError("Mode B requires inner_task (local) or harbor.task_source (registry).")
from vero.harbor.dataset import build_harbor_dataset
# Stage the dataset in a scratch dir that is always cleaned up (datasets can be
# gigabytes; a leaked mkdtemp would accumulate across builds). _register copies
# the dataset into vh before the dir is torn down, so cleanup is safe.
with tempfile.TemporaryDirectory() as tmp_str:
tmp = Path(tmp_str)
if config.mode == "A":
if not config.dataset:
raise ValueError("Mode A requires a dataset.")
dataset_id = _register(config.dataset, vh, tmp)
else:
if not (config.partition and config.harbor):
raise ValueError("Mode B requires partition + harbor.")
if not (config.inner_task or config.harbor.get("task_source")):
raise ValueError("Mode B requires inner_task (local) or harbor.task_source (registry).")
from vero.harbor.dataset import build_harbor_dataset

dataset_id = _register(build_harbor_dataset(config.partition), vh, tmp)
if config.inner_task: # local benchmark -> bake sidecar-only
shutil.copytree(Path(config.inner_task).resolve(), env_dir / "sidecar" / "inner-task")
dataset_id = _register(build_harbor_dataset(config.partition), vh, tmp)
if config.inner_task: # local benchmark -> bake sidecar-only
shutil.copytree(Path(config.inner_task).resolve(), env_dir / "sidecar" / "inner-task")

# 4. ServeConfig (compiler <-> serve contract)
(env_dir / "sidecar" / "serve.json").write_text(
json.dumps(_serve_config(config, dataset_id, base_commit), indent=2)
)

# 4b. Fail early if a declared secret is missing from the host env, so the
# operator finds out at build time rather than via a credential-less
# sidecar. The compose ${VAR:?} guard is the run-time backstop.
import os

if not os.environ.get("VERO_SKIP_SECRET_CHECK"):
missing = [s for s in config.secrets if not os.environ.get(s)]
if missing:
raise ValueError(
"Declared secrets missing from the host environment: "
f"{', '.join(missing)}. Set them, or set VERO_SKIP_SECRET_CHECK=1 "
"to defer to the run-time compose check."
)
Comment on lines +246 to +258

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Secret check runs after expensive build steps

The presence check for declared secrets (step 4b) executes after _prepare_baseline_repo (git archive of the agent repo) and the dataset staging block, both of which can take minutes for large repos or multi-gigabyte datasets. If a required secret is absent, the operator waits through those steps before learning about the missing credential. Moving this check to the top of compile_task β€” before step 1 β€” would fail fast and avoid the partial-state output directory that is left behind on failure.

Prompt To Fix With AI
This is a comment left during a code review.
Path: vero/src/vero/harbor/build/compiler.py
Line: 246-258

Comment:
**Secret check runs after expensive build steps**

The presence check for declared secrets (step 4b) executes after `_prepare_baseline_repo` (git archive of the agent repo) and the dataset staging block, both of which can take minutes for large repos or multi-gigabyte datasets. If a required secret is absent, the operator waits through those steps before learning about the missing credential. Moving this check to the top of `compile_task` β€” before step 1 β€” would fail fast and avoid the partial-state output directory that is left behind on failure.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex


# 5. render templates
jenv = Environment(
loader=FileSystemLoader(str(_TEMPLATES)),
Expand Down
4 changes: 3 additions & 1 deletion vero/src/vero/harbor/build/templates/docker-compose.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ services:
environment:
VERO_HOME_DIR: "/opt/vero_home"
{% for secret in secrets %}
{{ secret }}: "${{ '{' }}{{ secret }}{{ '}' }}"
{# Fail-fast: docker compose aborts if this host var is unset/empty, rather
than silently baking an empty credential into the sidecar. #}
{{ secret }}: "${{ '{' }}{{ secret }}:?{{ secret }} must be set in the host environment for the eval sidecar{{ '}' }}"
{% endfor %}
volumes:
- agent_repo:/work/agent:ro
Expand Down
9 changes: 8 additions & 1 deletion vero/src/vero/harbor/build/templates/seed.sh.j2
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ fi
# Whole repo is the optimizer's to edit...
chown -R agent:agent /work/agent
git config --system --add safe.directory /work/agent
# NOTE: read_only_paths is ADVISORY ONLY. The chmod/chown below locks paths in
# the live working tree (/work/agent), but the verifier checks out git blobs from
# the agent's commit, not this working tree, so an optimizer can still stage edits
# to a "locked" scorer path. Authoritative scorer-provenance is enforced
# sidecar-side: in Mode A the verifier scores with the sidecar-baked task project
# (task_project), never the scorer in the agent's commit. Treat this purely as a
# guardrail / footgun-reducer, not security.
{% for p in read_only_paths %}
# ...except locked paths (e.g. the scorer): root-owned + unwritable.
# ...except locked paths (e.g. the scorer): root-owned + unwritable (advisory).
if [ -e "/work/agent/{{ p }}" ]; then
chown -R root:root "/work/agent/{{ p }}"
chmod -R a-w "/work/agent/{{ p }}"
Expand Down
44 changes: 38 additions & 6 deletions vero/src/vero/harbor/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ async def produce_sample_results(
return
jobs_dir = Path(result_dir) / "jobs"

# Resume: only run tasks without an already-persisted SampleResult.
pending = [(sid, t) for sid, t in pairs if self._existing(params, sid) is None]
# Resume: only run tasks not already completed successfully. A persisted
# *error* sample (transient harbor/verifier failure) is NOT done, so it is
# re-run rather than permanently skipped.
pending = [(sid, t) for sid, t in pairs if not self._is_done(params, sid)]
if pending:
await self._run_harbor(
str(workspace.project_path), params, [t for _, t in pending], jobs_dir
Expand Down Expand Up @@ -97,6 +99,8 @@ def _build_command(
"--agent-import-path", c.agent_import_path,
"-e", c.environment,
"-n", str(params.max_concurrency),
"--n-attempts", str(c.n_attempts),
"--max-retries", str(c.max_retries),
]
if c.model:
cmd += ["-m", c.model]
Expand Down Expand Up @@ -136,8 +140,8 @@ def _collate(
) -> None:
trials = self._load_trials(jobs_dir) # {task_name: result_dict}
for sample_id, task_name in pairs:
if self._existing(params, sample_id) is not None:
continue # already collated (resume)
if self._is_done(params, sample_id):
continue # already collated successfully (resume); errors are redone
sample_result = self._sample_result(
trials.get(task_name), sample_id, task_name, params
)
Expand All @@ -155,17 +159,39 @@ def _load_trials(self, jobs_dir: Path) -> dict[str, dict]:
return trials
# Trial result.json files live at <jobs>/<timestamp>/<trial>/result.json; the
# job-level <jobs>/<timestamp>/result.json carries no task_name, so recurse and
# key on task_name (skipping the job summary).
# key on task_name (skipping the job summary). A task may have several trials
# (retries / multiple attempts); rglob order is undefined, so keep the BEST
# trial per task deterministically rather than last-write-wins (a failing
# retry must never clobber a passing trial).
best_rank: dict[str, tuple] = {}
for result_json in jobs_dir.rglob("result.json"):
try:
data = json.loads(result_json.read_text())
except (json.JSONDecodeError, OSError):
continue
task_name = data.get("task_name")
if task_name:
if not task_name:
continue
rank = self._trial_rank(data, result_json)
if task_name not in best_rank or rank > best_rank[task_name]:
best_rank[task_name] = rank
trials[task_name] = data
return trials

@staticmethod
def _trial_rank(data: dict, result_json: Path) -> tuple:
"""Sort key for picking the best of several trials of one task. Higher wins:
prefer a clean trial with rewards, then any trial with rewards, then the most
recent attempt (finished_at, falling back to file mtime)."""
has_rewards = bool((data.get("verifier_result") or {}).get("rewards"))
clean = has_rewards and not data.get("exception_info")
finished_at = data.get("finished_at") or ""
try:
mtime = result_json.stat().st_mtime
except OSError:
mtime = 0.0
return (clean, has_rewards, finished_at, mtime)

def _sample_result(
self,
trial: dict | None,
Expand Down Expand Up @@ -218,3 +244,9 @@ def _existing(self, params: EvaluationParameters, sample_id: int) -> SampleResul
params.result_id,
sample_id,
)

def _is_done(self, params: EvaluationParameters, sample_id: int) -> bool:
"""A sample is done only if a persisted result exists AND is not an error;
a transiently-failed sample must be re-run on resume, not skipped."""
existing = self._existing(params, sample_id)
return existing is not None and not existing.is_error()
42 changes: 40 additions & 2 deletions vero/tests/test_harbor_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def _dataset(root: Path) -> Path:


@pytest.fixture
def built(tmp_path):
def built(tmp_path, monkeypatch):
monkeypatch.setenv("VERO_SKIP_SECRET_CHECK", "1")
config = BuildConfig(
name="vero/gsm8k-opt",
description="optimize gsm8k",
Expand Down Expand Up @@ -109,7 +110,9 @@ def test_rendered_files_parse(built):
assert "eval-sidecar" in compose["services"]
assert compose["services"]["main"]["depends_on"]["eval-sidecar"]["condition"] == "service_healthy"
# secret reaches the sidecar only, via host-resolved compose interpolation
assert compose["services"]["eval-sidecar"]["environment"]["OPENAI_API_KEY"] == "${OPENAI_API_KEY}"
# with a fail-fast guard (${VAR:?msg}) so an unset host var aborts the run.
sidecar_secret = compose["services"]["eval-sidecar"]["environment"]["OPENAI_API_KEY"]
assert sidecar_secret.startswith("${OPENAI_API_KEY:?")
assert "OPENAI_API_KEY" not in compose["services"]["main"].get("environment", {})


Expand All @@ -129,3 +132,38 @@ def head(p):
assert head("environment/agent-baseline") == head("environment/agent-seed")
cfg = json.loads((built / "environment/sidecar/serve.json").read_text())
assert cfg["base_commit"] == head("environment/agent-baseline")


def test_baseline_archive_failure_raises(tmp_path, monkeypatch):
monkeypatch.setenv("VERO_SKIP_SECRET_CHECK", "1")
# An empty git repo with no commits: `git archive HEAD` exits nonzero.
bad = tmp_path / "emptyrepo"
(bad / "src").mkdir(parents=True)
(bad / "pyproject.toml").write_text("[project]\nname='x'\nversion='0'\n")
subprocess.run(["git", "init", "-q"], cwd=bad, check=True)
config = BuildConfig(
name="vero/x", agent_repo=str(bad), mode="A", task="gsm8k",
dataset=str(_dataset(tmp_path)),
splits=[{"split": "validation", "access": "non_viewable"}],
)
with pytest.raises(RuntimeError, match="git archive failed"):
compile_task(config, tmp_path / "task", vero_root=_stub_vero(tmp_path))


def test_missing_secret_fails_build(tmp_path, monkeypatch):
monkeypatch.delenv("VERO_SKIP_SECRET_CHECK", raising=False)
monkeypatch.delenv("DEFINITELY_MISSING_SECRET", raising=False)
config = BuildConfig(
name="vero/x", agent_repo=str(_agent_repo(tmp_path)), mode="A", task="gsm8k",
dataset=str(_dataset(tmp_path)),
splits=[{"split": "validation", "access": "non_viewable"}],
secrets=["DEFINITELY_MISSING_SECRET"],
)
with pytest.raises(ValueError, match="missing from the host environment"):
compile_task(config, tmp_path / "task", vero_root=_stub_vero(tmp_path))


def test_seed_documents_advisory_read_only(built):
seed = (built / "environment/main/seed.sh").read_text()
assert "ADVISORY ONLY" in seed
assert "sidecar-side" in seed
86 changes: 86 additions & 0 deletions vero/tests/test_harbor_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,89 @@ async def test_resume_only_runs_pending(self, tmp_path, monkeypatch):

# only the pending task name was passed to harbor
assert runner._run_harbor.await_args.args[2] == ["t1"]


class TestReviewFixes:
def test_emits_attempts_and_retries(self):
runner = HarborRunner(
HarborConfig(
task_source="org/ds@1",
agent_import_path="pkg.mod:Agent",
model="anthropic/x",
environment="modal",
n_attempts=3,
max_retries=5,
)
)
cmd = runner._build_command("/wt", _params(), ["t0"], Path("/jobs"))
assert "--n-attempts" in cmd and cmd[cmd.index("--n-attempts") + 1] == "3"
assert "--max-retries" in cmd and cmd[cmd.index("--max-retries") + 1] == "5"

def test_passing_trial_wins_over_later_failing_retry(self, tmp_path):
runner = _runner()
jobs = tmp_path / "jobs"
run = jobs / "2026-01-01__00-00-00"
# passing trial, earlier finished_at
good = run / "trial0"
good.mkdir(parents=True)
(good / "result.json").write_text(json.dumps({
"task_name": "t0", "trial_name": "trial0", "finished_at": "2026-01-01T00:01:00",
"verifier_result": {"rewards": {"pass": 1.0}},
}))
# failing retry, later finished_at + exception_info; written second (newer mtime)
bad = run / "trial1"
bad.mkdir(parents=True)
(bad / "result.json").write_text(json.dumps({
"task_name": "t0", "trial_name": "trial1", "finished_at": "2026-01-01T00:09:00",
"exception_info": {"exception_type": "RuntimeError", "exception_message": "boom",
"exception_traceback": ""},
"verifier_result": None,
}))
trials = runner._load_trials(jobs)
assert trials["t0"]["trial_name"] == "trial0"
assert (trials["t0"]["verifier_result"] or {}).get("rewards") == {"pass": 1.0}

def test_latest_attempt_wins_when_both_clean(self, tmp_path):
runner = _runner()
jobs = tmp_path / "jobs"
run = jobs / "2026-01-01__00-00-00"
early = run / "a"
early.mkdir(parents=True)
(early / "result.json").write_text(json.dumps({
"task_name": "t0", "trial_name": "early", "finished_at": "2026-01-01T00:01:00",
"verifier_result": {"rewards": {"pass": 0.0}},
}))
late = run / "b"
late.mkdir(parents=True)
(late / "result.json").write_text(json.dumps({
"task_name": "t0", "trial_name": "late", "finished_at": "2026-01-01T00:05:00",
"verifier_result": {"rewards": {"pass": 1.0}},
}))
trials = runner._load_trials(jobs)
assert trials["t0"]["trial_name"] == "late"

@pytest.mark.asyncio
async def test_resume_reruns_persisted_error_sample(self, tmp_path, monkeypatch):
monkeypatch.setenv("VERO_HOME_DIR", str(tmp_path / "vh"))
runner = _runner()
params = _params()
result_dir = tmp_path / "result"
# sample 0 previously errored (transient failure)
save_sample_result(
get_vero_home_dir() / "sessions", "s", params.result_id, sample_id=0,
result=SampleResult(
dataset_sample=DatasetSample(sample_id=0, split="test", dataset_id="ds"),
error="transient harbor failure", commit="c1", result_id=params.result_id,
),
)
# a good trial for t0 now exists
_write_trial(result_dir / "jobs", "trial0", "t0", {"pass": 1.0})
monkeypatch.setattr(runner, "_task_names_for", lambda p: [(0, "t0")])
runner._run_harbor = AsyncMock()
ws = MagicMock(project_path="/wt")
await runner.produce_sample_results(workspace=ws, params=params, result_dir=result_dir)
# error sample was treated as pending (re-run) and re-collated to a score
assert runner._run_harbor.await_args.args[2] == ["t0"]
results = load_all_sample_results(get_vero_home_dir() / "sessions", "s", params.result_id)
assert results[0].error is None
assert results[0].score == 1.0