Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions vero/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ dependencies = [
"datasets>=4.3.0",
"pydantic>=2.11.7",
"python-dotenv>=1.2.2",
"pyyaml>=6.0",
"requests>=2.32.5",
"rich>=13.9.4",
"s3fs>=2025.9.0",
"tenacity>=9.1.2",
"toml>=0.10.2",
Expand All @@ -37,6 +39,12 @@ docker = [
claude = [
"claude-agent-sdk>=0.1.56",
]
harbor = [
"fastapi>=0.110",
"uvicorn>=0.27",
"httpx>=0.27",
"jinja2>=3.1.6",
]
optimize = [
"async-lru>=2.0.5",
"beautifulsoup4>=4.14.2",
Expand Down
201 changes: 201 additions & 0 deletions vero/src/vero/core/budget.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""Per-split evaluation budgets and the ledger that meters them.

``SplitBudget`` is the public, stateful budget for one (split, dataset_id) pair.
``BudgetLedger`` owns a set of them — the keys also form the allowlist of
evaluable combinations. The ledger is in-memory by default (the in-process
``ExperimentRunnerTool``); with a ``persist_path`` it flushes every mutation to
durable JSON under a single-writer lock (the Harbor eval sidecar).
"""

from __future__ import annotations

import asyncio
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path

from vero.exceptions import ExperimentBudgetExceeded, InvalidSplitError

logger = logging.getLogger(__name__)


@dataclass
class SplitBudget:
"""A stateful object that tracks the remaining budget for running experiments."""

split: str
dataset_id: str = ""
total_sample_budget: int | None = None
remaining_sample_budget: int | None = field(init=False)
total_run_budget: int | None = None
remaining_run_budget: int | None = field(init=False)
max_samples_per_run: int | None = None

def __repr__(self) -> str:
repr_items = [
("split", self.split),
("dataset_id", self.dataset_id),
("total_sample_budget", self.total_sample_budget),
("total_run_budget", self.total_run_budget),
]
repr_items = [item for item in repr_items if item[1] is not None]
return (
f"SplitBudget({', '.join([f'{item[0]}={item[1]}' for item in repr_items])})"
)

def __post_init__(self):
assert (
self.total_sample_budget is not None or self.total_run_budget is not None
), "Either total sample budget or total run budget must be provided."
self.remaining_sample_budget = self.total_sample_budget
self.remaining_run_budget = self.total_run_budget

assert (
isinstance(self.total_sample_budget, int)
or self.total_sample_budget is None
)
assert isinstance(self.total_run_budget, int) or self.total_run_budget is None
assert (
isinstance(self.max_samples_per_run, int)
or self.max_samples_per_run is None
)

def has_run_budget(self) -> bool:
return self.remaining_run_budget is None or self.remaining_run_budget > 0

def decrement_run_budget(self) -> None:
if self.remaining_run_budget is not None:
self.remaining_run_budget -= 1

def has_sample_budget(self, num_samples: int) -> bool:
return (
self.remaining_sample_budget is None
or self.remaining_sample_budget >= num_samples
)

def decrement_sample_budget(self, num_samples: int) -> None:
if self.remaining_sample_budget is not None:
self.remaining_sample_budget -= num_samples

def exceeds_per_run_budget(self, num_samples: int) -> bool:
return (
self.max_samples_per_run is not None
and num_samples > self.max_samples_per_run
)


class BudgetLedger:
"""Meters evaluation budget across (split, dataset_id) pairs.

The keys are also the allowlist of evaluable combinations: a pair with no
budget entry is rejected by ``validate``.

In-memory by default. Pass ``persist_path`` for the durable, crash-safe
variant used by the Harbor sidecar — every mutation is flushed under a
single-writer lock, and ``reserve`` checks-and-decrements atomically before a
run so concurrent callers cannot overspend. Budget is never refunded on error.
"""

def __init__(
self,
budgets: list[SplitBudget] | None = None,
*,
persist_path: Path | str | None = None,
):
self._budgets: dict[tuple[str, str], SplitBudget] = {
(b.split, b.dataset_id): b for b in (budgets or [])
}
self.persist_path = Path(persist_path) if persist_path else None
self._lock = asyncio.Lock()
Comment on lines +102 to +110

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Persisted budgets reset

_flush() writes remaining_sample_budget and remaining_run_budget to persist_path, but constructing a ledger with that same path never reads the file back. After the durable Harbor sidecar spends budget and restarts, BudgetLedger(configured_budgets, persist_path=...) rebuilds fresh SplitBudget objects with their original totals, so callers can spend the same run/sample budget again even though the JSON file already recorded it as used.

Artifacts

Repro: focused persisted budget reset script

  • Contains supporting evidence from the run (text/x-python; charset=utf-8).

Repro: execution output showing persisted remaining budgets ignored after restart

  • Keeps the command output available without making the summary code-heavy.

View artifacts

T-Rex Ran code and verified through T-Rex

Prompt To Fix With AI
This is a comment left during a code review.
Path: vero/src/vero/core/budget.py
Line: 102-110

Comment:
**Persisted budgets reset**

`_flush()` writes `remaining_sample_budget` and `remaining_run_budget` to `persist_path`, but constructing a ledger with that same path never reads the file back. After the durable Harbor sidecar spends budget and restarts, `BudgetLedger(configured_budgets, persist_path=...)` rebuilds fresh `SplitBudget` objects with their original totals, so callers can spend the same run/sample budget again even though the JSON file already recorded it as used.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex


def validate(self, dataset_id: str, split: str) -> None:
"""Raise if (split, dataset_id) is not an allowed combination."""
if (split, dataset_id) not in self._budgets:
allowed_keys = list(self._budgets.keys())
raise InvalidSplitError(
f"No split budget found for the combination (dataset_id={dataset_id}, split={split}) "
f"either because it does not exist or because it is not allowed. "
f"Allowed combinations: {allowed_keys}"
)

def get(self, dataset_id: str, split: str) -> SplitBudget:
"""Return the budget for a pair (validates membership first)."""
self.validate(dataset_id, split)
return self._budgets[(split, dataset_id)]

def check(self, dataset_id: str, split: str, num_samples: int) -> None:
"""Raise ExperimentBudgetExceeded if the request would exceed the budget."""
budget = self.get(dataset_id, split)
if not budget.has_run_budget():
raise ExperimentBudgetExceeded(
f"No runs left for the {split} split of the {dataset_id} dataset."
)
if not budget.has_sample_budget(num_samples):
raise ExperimentBudgetExceeded(
f"Requested {num_samples} samples for the {split} split of the {dataset_id} dataset, "
f"but the remaining sample budget only allows for {budget.remaining_sample_budget} samples."
)
if budget.exceeds_per_run_budget(num_samples):
raise ExperimentBudgetExceeded(
f"Requested {num_samples} samples for the {split} split of the {dataset_id} dataset, "
f"but only {budget.max_samples_per_run} are allowed per run."
)

def _decrement(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget:
"""In-memory decrement only (no flush). Caller is responsible for durability."""
budget = self.get(dataset_id, split)
budget.decrement_sample_budget(num_samples)
budget.decrement_run_budget()
return budget

def record(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget:
"""Decrement the budget for a completed (or attempted) run and flush."""
budget = self._decrement(dataset_id, split, num_samples)
self._flush()
return budget

async def reserve(
self, dataset_id: str, split: str, num_samples: int
) -> SplitBudget:
"""Atomically check + record before a run (durable, single-writer).

Raises InvalidSplitError / ExperimentBudgetExceeded *before* decrementing,
so a rejected request costs nothing; a reserved request is never refunded.
"""
async with self._lock:
self.check(dataset_id, split, num_samples)
budget = self._decrement(dataset_id, split, num_samples)
# Flush off the event loop (to_thread) but still under the lock: the
# to_thread call keeps the synchronous write from blocking the loop
# (the actual bug), while holding the lock keeps concurrent flushes
# from racing on the shared temp file and preserves decrement/flush
# ordering so a crash never persists more remaining budget than was
# committed.
if self.persist_path is not None:
await asyncio.to_thread(self._flush)
return budget

def status(self) -> dict[tuple[str, str], SplitBudget]:
"""Return all budgets keyed by (split, dataset_id)."""
return dict(self._budgets)

def _flush(self) -> None:
if self.persist_path is None:
return
data = [
{
"split": b.split,
"dataset_id": b.dataset_id,
"total_sample_budget": b.total_sample_budget,
"remaining_sample_budget": b.remaining_sample_budget,
"total_run_budget": b.total_run_budget,
"remaining_run_budget": b.remaining_run_budget,
"max_samples_per_run": b.max_samples_per_run,
}
for b in self._budgets.values()
]
self.persist_path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.persist_path.with_suffix(self.persist_path.suffix + ".tmp")
tmp.write_text(json.dumps(data, indent=2))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reserve() holds self._lock (an asyncio.Lock) across this synchronous tmp.write_text() + replace(). In the durable sidecar path under concurrency this stalls the event loop on every reservation. Keep the lock around only the in-memory check+decrement, and push the flush out with await asyncio.to_thread(...). (Confirms Greptile's P1.)

tmp.replace(self.persist_path)
14 changes: 12 additions & 2 deletions vero/src/vero/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ def main():
setup_logging()


# Optional `vero harbor` group (requires the `harbor` extra). Registered lazily so the
# base CLI works without it.
try:
from vero.harbor.cli import harbor as _harbor_group

main.add_command(_harbor_group)
except ImportError:
pass


@main.group()
def init():
"""Initialize evaluation scaffolds for your uv project."""
Expand Down Expand Up @@ -578,7 +588,7 @@ def check(
if errors:
click.echo("\n Skipping task discovery (project issues above)")
else:
from vero.evaluator import Evaluator
from vero.evaluation.evaluator import Evaluator
from vero.workspace.git import GitWorkspace

async def _discover():
Expand Down Expand Up @@ -760,7 +770,7 @@ def evaluate(
"""Run an evaluation on an agent codebase."""
import asyncio

from vero.evaluator import run_evaluation
from vero.evaluation.evaluator import run_evaluation

asyncio.run(
run_evaluation(
Expand Down
2 changes: 2 additions & 0 deletions vero/src/vero/core/dataset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
SplitAccessLevel,
default_split_accesses,
get_non_viewable_splits,
resolve_split_access,
)
from vero.core.dataset.store import (
dataset_exists,
Expand All @@ -27,5 +28,6 @@
"hash_dataset_dict",
"list_datasets",
"load_dataset",
"resolve_split_access",
"save_dataset",
]
41 changes: 37 additions & 4 deletions vero/src/vero/core/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ class DefaultSplitNames(StrEnum):


class SplitAccessLevel(StrEnum):
"""Access levels for dataset splits."""
"""Access levels for dataset splits.

Three tiers of increasing restriction:
- viewable: rows materialized + full per-sample results visible.
- non_viewable: no rows, but the split can be evaluated and summary stats seen.
- no_access: no rows, no summary, and not agent-evaluable (admin/verifier only).
"""

viewable = "viewable"
non_viewable = "non_viewable"
no_access = "no_access"


@dataclass
Expand All @@ -40,20 +47,46 @@ def viewable(cls, split: str) -> SplitAccess:
def non_viewable(cls, split: str) -> SplitAccess:
return cls(split=split, access=SplitAccessLevel.non_viewable)

@classmethod
def no_access(cls, split: str) -> SplitAccess:
return cls(split=split, access=SplitAccessLevel.no_access)


default_split_accesses = (
SplitAccess.non_viewable(DefaultSplitNames.test),
SplitAccess.no_access(DefaultSplitNames.test),
SplitAccess.non_viewable(DefaultSplitNames.validation),
)


def get_non_viewable_splits(split_accesses: list[SplitAccess]) -> list[str]:
"""Extract non-viewable splits from a list of SplitAccess."""
"""Splits whose rows/details are not viewable (non_viewable and no_access).

no_access is strictly more restrictive than non_viewable, so it is excluded
everywhere non_viewable is. The non_viewable/no_access distinction (summary +
agent-evaluable vs. not) is enforced in the evaluation engine, not here.
"""
return [
sa.split for sa in split_accesses if sa.access == SplitAccessLevel.non_viewable
sa.split
for sa in split_accesses
if sa.access in (SplitAccessLevel.non_viewable, SplitAccessLevel.no_access)
]


def resolve_split_access(
split: str, split_accesses: list[SplitAccess]
) -> SplitAccessLevel:
"""Resolve the access tier for a split. Fail closed: an unlisted split is
treated as ``no_access`` (not agent-evaluable), never as viewable.

This is the core-side source of truth for split visibility, deliberately
free of any ``vero.harbor`` import so the evaluation engine can gate on it.
"""
for sa in split_accesses:
if sa.split == split:
return sa.access
return SplitAccessLevel.no_access


class DatasetInfo(BaseModel):
"""An identifier and summary of a dataset.

Expand Down
Loading