Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions vero/src/vero/core/budget.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,16 @@ def check(self, dataset_id: str, split: str, num_samples: int) -> None:
f"but only {budget.max_samples_per_run} are allowed per run."
)

def record(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget:
"""Decrement the budget for a completed (or attempted) run and flush."""
def _decrement(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget:
"""In-memory decrement only (no flush). Caller is responsible for durability."""
budget = self.get(dataset_id, split)
budget.decrement_sample_budget(num_samples)
budget.decrement_run_budget()
return budget

def record(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget:
"""Decrement the budget for a completed (or attempted) run and flush."""
budget = self._decrement(dataset_id, split, num_samples)
self._flush()
return budget

Expand All @@ -160,7 +165,16 @@ async def reserve(
"""
async with self._lock:
self.check(dataset_id, split, num_samples)
return self.record(dataset_id, split, num_samples)
budget = self._decrement(dataset_id, split, num_samples)
# Flush off the event loop (to_thread) but still under the lock: the
# to_thread call keeps the synchronous write from blocking the loop
# (the actual bug), while holding the lock keeps concurrent flushes
# from racing on the shared temp file and preserves decrement/flush
# ordering so a crash never persists more remaining budget than was
# committed.
if self.persist_path is not None:
await asyncio.to_thread(self._flush)
return budget

def status(self) -> dict[tuple[str, str], SplitBudget]:
"""Return all budgets keyed by (split, dataset_id)."""
Expand Down
2 changes: 2 additions & 0 deletions vero/src/vero/core/dataset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
SplitAccessLevel,
default_split_accesses,
get_non_viewable_splits,
resolve_split_access,
)
from vero.core.dataset.store import (
dataset_exists,
Expand All @@ -27,5 +28,6 @@
"hash_dataset_dict",
"list_datasets",
"load_dataset",
"resolve_split_access",
"save_dataset",
]
15 changes: 15 additions & 0 deletions vero/src/vero/core/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ def get_non_viewable_splits(split_accesses: list[SplitAccess]) -> list[str]:
]


def resolve_split_access(
split: str, split_accesses: list[SplitAccess]
) -> SplitAccessLevel:
"""Resolve the access tier for a split. Fail closed: an unlisted split is
treated as ``no_access`` (not agent-evaluable), never as viewable.

This is the core-side source of truth for split visibility, deliberately
free of any ``vero.harbor`` import so the evaluation engine can gate on it.
"""
for sa in split_accesses:
if sa.split == split:
return sa.access
return SplitAccessLevel.no_access


class DatasetInfo(BaseModel):
"""An identifier and summary of a dataset.

Expand Down
24 changes: 21 additions & 3 deletions vero/src/vero/evaluation/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
from typing import TYPE_CHECKING

from vero.core.budget import BudgetLedger, SplitBudget
from vero.core.dataset import (
SplitAccess,
SplitAccessLevel,
resolve_split_access,
)
from vero.core.evaluation import BaseEvaluationParameters
from vero.exceptions import InvalidSplitError

if TYPE_CHECKING:
from vero.core.db.database import Experiment, ExperimentDatabase
Expand Down Expand Up @@ -52,6 +58,7 @@ def __init__(
run_constraints: BaseEvaluationParameters | None = None,
session_id: str | None = None,
vero_home: Path | None = None,
split_accesses: list[SplitAccess] | None = None,
):
self.evaluator = evaluator
self.budget = budget
Expand All @@ -60,6 +67,7 @@ def __init__(
self.run_constraints = run_constraints or BaseEvaluationParameters()
self.session_id = session_id
self.vero_home = vero_home
self.split_accesses = split_accesses

@classmethod
def from_session(cls, session) -> EvaluationEngine:
Expand All @@ -74,6 +82,7 @@ def from_session(cls, session) -> EvaluationEngine:
run_constraints=session.evaluation_parameters,
session_id=session.session_id,
vero_home=session.vero_home,
split_accesses=session.split_accesses,
)

# ------------------------------------------------------------------
Expand Down Expand Up @@ -140,10 +149,19 @@ def resolve_samples(self, req: EvalRequest) -> tuple[list[int] | None, int]:
async def evaluate(self, req: EvalRequest, *, admin: bool = False) -> Experiment:
"""Meter (unless admin) and run one evaluation; return the full Experiment.

``no_access`` gating is implicit: those splits are absent from the budget
ledger, so ``reserve`` raises ``InvalidSplitError`` for the agent; admin
bypasses the ledger and may evaluate anything.
``no_access`` gating is EXPLICIT and fail-closed: when ``split_accesses``
is configured, the split's tier is resolved (an unlisted split defaults
to ``no_access``) and a ``no_access`` split is hard-rejected for non-admin
callers *before* the budget ledger is consulted. Admin bypasses both the
tier gate and the ledger and may evaluate anything.
"""
if not admin and self.split_accesses is not None:
tier = resolve_split_access(req.split, self.split_accesses)
if tier == SplitAccessLevel.no_access:
raise InvalidSplitError(
f"Split '{req.split}' of dataset '{req.dataset_id}' is no_access "
f"and cannot be evaluated by the agent."
)
sample_ids, n = self.resolve_samples(req)
if not admin:
await self.budget.reserve(req.dataset_id, req.split, n)
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Expand Down
43 changes: 42 additions & 1 deletion vero/src/vero/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from vero.core.constants import default_minimum_score
from vero.core.dataset import (
SplitAccess,
SplitAccessLevel,
default_split_accesses,
resolve_split_access,
)
from vero.core.db.database import Experiment, ExperimentDatabase
from vero.core.evaluation import BaseEvaluationParameters
Expand Down Expand Up @@ -340,6 +342,7 @@ async def init(self) -> None:

# Split budget
self._build_split_budget()
self._ensure_budgeted_splits_tiered()
self._validate_budget_splits()
self.session.budget = self.budget

Expand Down Expand Up @@ -503,8 +506,37 @@ def _build_split_budget(self) -> None:
)
self.budget = budgets

def _ensure_budgeted_splits_tiered(self) -> None:
"""Ensure every agent-budgeted split has an explicit access tier.

A budgeted split is one the agent may evaluate, so it must be
agent-evaluable with per-sample labels hidden, i.e. ``non_viewable``.
Any budgeted split missing from ``split_accesses`` is auto-tiered
``non_viewable`` here, so a budget is never silently unusable
(``no_access`` by omission, which the engine rejects) nor a label leak
(``viewable`` by omission). Explicit author tiers are left untouched and
checked in :meth:`_validate_budget_splits`.
"""
if not self.budget:
return
listed = {sa.split for sa in self.split_accesses}
for b in self.budget:
if b.split not in listed:
self.split_accesses.append(SplitAccess.non_viewable(b.split))
listed.add(b.split)
logger.info(
f"Budgeted split '{b.split}' was not listed in "
f"split_accesses; defaulting it to non_viewable "
f"(agent-evaluable, per-sample labels hidden)."
)

def _validate_budget_splits(self) -> None:
"""Warn if any budget splits don't exist in the dataset."""
"""Warn if any budget splits do not exist in the dataset, and verify
that every agent-budgeted split is tiered ``non_viewable``.

``viewable`` would leak per-sample labels to the agent; ``no_access``
would make the budget unusable. Both are misconfigurations and raise.
"""
if not self.budget:
return
from vero.core.dataset.store import load_dataset
Expand All @@ -525,6 +557,15 @@ def _validate_budget_splits(self) -> None:
except Exception:
pass

tier = resolve_split_access(b.split, self.split_accesses)
if tier != SplitAccessLevel.non_viewable:
raise ValueError(
f"Agent-budgeted split '{b.split}' is tiered '{tier}', but a "
f"budgeted split must be 'non_viewable' (agent-evaluable with "
f"per-sample labels hidden). 'viewable' would leak labels and "
f"'no_access' would make the budget unusable."
)

def _maybe_make_db(self) -> ExperimentDatabase:
"""Create or reconstruct the experiment database.

Expand Down
73 changes: 73 additions & 0 deletions vero/tests/test_budget.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,76 @@ def test_no_flush_when_in_memory(self, tmp_path):
led = _ledger() # persist_path=None
led.record("ds1", "dev", 25) # no file written, no error
assert not list(tmp_path.iterdir())


class TestReserveFlushOffLoop:
@pytest.mark.asyncio
async def test_reserve_flushes_durably(self, tmp_path):
path = tmp_path / "ledger.json"
led = _ledger(persist_path=path)
await led.reserve("ds1", "dev", 25)
data = json.loads(path.read_text())
entry = next(e for e in data if e["split"] == "dev")
assert entry["remaining_sample_budget"] == 75
assert entry["remaining_run_budget"] == 2

@pytest.mark.asyncio
async def test_reserve_flush_goes_through_to_thread(self, tmp_path, monkeypatch):
import asyncio as _asyncio

path = tmp_path / "ledger.json"
led = _ledger(persist_path=path)
seen = []
real_to_thread = _asyncio.to_thread

async def _spy(func, *args, **kwargs):
seen.append(func)
return await real_to_thread(func, *args, **kwargs)

monkeypatch.setattr("vero.core.budget.asyncio.to_thread", _spy)
await led.reserve("ds1", "dev", 10)
# the durable flush ran off the event loop
assert led._flush in seen
# and it actually persisted
data = json.loads(path.read_text())
entry = next(e for e in data if e["split"] == "dev")
assert entry["remaining_sample_budget"] == 90

@pytest.mark.asyncio
async def test_reserve_no_to_thread_when_in_memory(self, monkeypatch):
import asyncio as _asyncio

led = _ledger() # persist_path=None
called = []
real_to_thread = _asyncio.to_thread

async def _spy(func, *args, **kwargs):
called.append(func)
return await real_to_thread(func, *args, **kwargs)

monkeypatch.setattr("vero.core.budget.asyncio.to_thread", _spy)
await led.reserve("ds1", "dev", 10)
assert called == [] # no flush path when in-memory
assert led.get("ds1", "dev").remaining_sample_budget == 90

@pytest.mark.asyncio
async def test_concurrent_reserves_do_not_overspend(self, tmp_path):
import asyncio as _asyncio

path = tmp_path / "ledger.json"
# run budget of 3: only 3 of 10 concurrent reserves may succeed
led = BudgetLedger(
[SplitBudget(split="dev", dataset_id="ds1", total_run_budget=3)],
persist_path=path,
)

async def _try():
try:
await led.reserve("ds1", "dev", 0)
return True
except ExperimentBudgetExceeded:
return False

results = await _asyncio.gather(*[_try() for _ in range(10)])
assert sum(results) == 3
assert led.get("ds1", "dev").remaining_run_budget == 0
23 changes: 23 additions & 0 deletions vero/tests/test_dataset_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Tests for resolve_split_access (vero.core.dataset.base): fail-closed tier resolution."""

from vero.core.dataset import SplitAccess, SplitAccessLevel, resolve_split_access


def test_resolve_listed_tiers():
accesses = [
SplitAccess.viewable("train"),
SplitAccess.non_viewable("validation"),
SplitAccess.no_access("test"),
]
assert resolve_split_access("train", accesses) == SplitAccessLevel.viewable
assert resolve_split_access("validation", accesses) == SplitAccessLevel.non_viewable
assert resolve_split_access("test", accesses) == SplitAccessLevel.no_access


def test_resolve_unlisted_split_fails_closed():
accesses = [SplitAccess.viewable("train")]
assert resolve_split_access("holdout", accesses) == SplitAccessLevel.no_access


def test_resolve_empty_accesses_fails_closed():
assert resolve_split_access("anything", []) == SplitAccessLevel.no_access
67 changes: 67 additions & 0 deletions vero/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,70 @@ async def test_admin_bypasses_budget(self, monkeypatch):
svc.evaluator.evaluate.assert_awaited_once()
# nothing metered
assert svc.status()[("dev", "ds1")].remaining_run_budget == 3


class TestNoAccessGate:
@pytest.mark.asyncio
async def test_evaluate_no_access_split_rejected_before_ledger(self, monkeypatch):
from vero.core.dataset import SplitAccess

# Pathological-but-instructive: a no_access split that DOES have a ledger
# entry. Without the explicit tier gate the implicit ledger check would
# let it through. With the gate it must be rejected before reserve().
svc = _make_service(
budgets=[
SplitBudget(split="test", dataset_id="ds1", total_sample_budget=100, total_run_budget=3)
],
monkeypatch=monkeypatch,
)
svc.split_accesses = [SplitAccess.no_access("test")]
with pytest.raises(InvalidSplitError):
await svc.evaluate(
EvalRequest(dataset_id="ds1", split="test", commit="c1", num_samples=10)
)
svc.evaluator.evaluate.assert_not_awaited()
# nothing metered: tier gate fired before the ledger
assert svc.status()[("test", "ds1")].remaining_run_budget == 3
assert svc.status()[("test", "ds1")].remaining_sample_budget == 100

@pytest.mark.asyncio
async def test_evaluate_unlisted_split_defaults_no_access(self, monkeypatch):
from vero.core.dataset import SplitAccess

svc = _make_service(
budgets=[
SplitBudget(split="test", dataset_id="ds1", total_sample_budget=100, total_run_budget=3)
],
monkeypatch=monkeypatch,
)
# 'test' is not listed in split_accesses at all -> fail closed to no_access
svc.split_accesses = [SplitAccess.viewable("dev")]
with pytest.raises(InvalidSplitError):
await svc.evaluate(
EvalRequest(dataset_id="ds1", split="test", commit="c1", num_samples=10)
)
svc.evaluator.evaluate.assert_not_awaited()

@pytest.mark.asyncio
async def test_admin_bypasses_no_access_gate(self, monkeypatch):
from vero.core.dataset import SplitAccess

svc = _make_service(monkeypatch=monkeypatch)
svc.split_accesses = [SplitAccess.no_access("test")]
await svc.evaluate(
EvalRequest(dataset_id="ds1", split="test", commit="c1", num_samples=10),
admin=True,
)
svc.evaluator.evaluate.assert_awaited_once()

@pytest.mark.asyncio
async def test_non_viewable_split_still_evaluable(self, monkeypatch):
from vero.core.dataset import SplitAccess

svc = _make_service(monkeypatch=monkeypatch) # 'dev' budget present
svc.split_accesses = [SplitAccess.non_viewable("dev")]
await svc.evaluate(
EvalRequest(dataset_id="ds1", split="dev", commit="c1", num_samples=10)
)
svc.evaluator.evaluate.assert_awaited_once()
assert svc.status()[("dev", "ds1")].remaining_run_budget == 2
Loading