diff --git a/vero/src/vero/core/budget.py b/vero/src/vero/core/budget.py index 3d370f5..7d738a0 100644 --- a/vero/src/vero/core/budget.py +++ b/vero/src/vero/core/budget.py @@ -142,11 +142,16 @@ def check(self, dataset_id: str, split: str, num_samples: int) -> None: f"but only {budget.max_samples_per_run} are allowed per run." ) - def record(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget: - """Decrement the budget for a completed (or attempted) run and flush.""" + def _decrement(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget: + """In-memory decrement only (no flush). Caller is responsible for durability.""" budget = self.get(dataset_id, split) budget.decrement_sample_budget(num_samples) budget.decrement_run_budget() + return budget + + def record(self, dataset_id: str, split: str, num_samples: int) -> SplitBudget: + """Decrement the budget for a completed (or attempted) run and flush.""" + budget = self._decrement(dataset_id, split, num_samples) self._flush() return budget @@ -160,7 +165,16 @@ async def reserve( """ async with self._lock: self.check(dataset_id, split, num_samples) - return self.record(dataset_id, split, num_samples) + budget = self._decrement(dataset_id, split, num_samples) + # Flush off the event loop (to_thread) but still under the lock: the + # to_thread call keeps the synchronous write from blocking the loop + # (the actual bug), while holding the lock keeps concurrent flushes + # from racing on the shared temp file and preserves decrement/flush + # ordering so a crash never persists more remaining budget than was + # committed. + if self.persist_path is not None: + await asyncio.to_thread(self._flush) + return budget def status(self) -> dict[tuple[str, str], SplitBudget]: """Return all budgets keyed by (split, dataset_id).""" diff --git a/vero/src/vero/core/dataset/__init__.py b/vero/src/vero/core/dataset/__init__.py index 96a0c5c..7274deb 100644 --- a/vero/src/vero/core/dataset/__init__.py +++ b/vero/src/vero/core/dataset/__init__.py @@ -7,6 +7,7 @@ SplitAccessLevel, default_split_accesses, get_non_viewable_splits, + resolve_split_access, ) from vero.core.dataset.store import ( dataset_exists, @@ -27,5 +28,6 @@ "hash_dataset_dict", "list_datasets", "load_dataset", + "resolve_split_access", "save_dataset", ] diff --git a/vero/src/vero/core/dataset/base.py b/vero/src/vero/core/dataset/base.py index 0b5c0bf..8d9b8e9 100644 --- a/vero/src/vero/core/dataset/base.py +++ b/vero/src/vero/core/dataset/base.py @@ -72,6 +72,21 @@ def get_non_viewable_splits(split_accesses: list[SplitAccess]) -> list[str]: ] +def resolve_split_access( + split: str, split_accesses: list[SplitAccess] +) -> SplitAccessLevel: + """Resolve the access tier for a split. Fail closed: an unlisted split is + treated as ``no_access`` (not agent-evaluable), never as viewable. + + This is the core-side source of truth for split visibility, deliberately + free of any ``vero.harbor`` import so the evaluation engine can gate on it. + """ + for sa in split_accesses: + if sa.split == split: + return sa.access + return SplitAccessLevel.no_access + + class DatasetInfo(BaseModel): """An identifier and summary of a dataset. diff --git a/vero/src/vero/evaluation/engine.py b/vero/src/vero/evaluation/engine.py index 5524c57..69b9483 100644 --- a/vero/src/vero/evaluation/engine.py +++ b/vero/src/vero/evaluation/engine.py @@ -15,7 +15,13 @@ from typing import TYPE_CHECKING from vero.core.budget import BudgetLedger, SplitBudget +from vero.core.dataset import ( + SplitAccess, + SplitAccessLevel, + resolve_split_access, +) from vero.core.evaluation import BaseEvaluationParameters +from vero.exceptions import InvalidSplitError if TYPE_CHECKING: from vero.core.db.database import Experiment, ExperimentDatabase @@ -52,6 +58,7 @@ def __init__( run_constraints: BaseEvaluationParameters | None = None, session_id: str | None = None, vero_home: Path | None = None, + split_accesses: list[SplitAccess] | None = None, ): self.evaluator = evaluator self.budget = budget @@ -60,6 +67,7 @@ def __init__( self.run_constraints = run_constraints or BaseEvaluationParameters() self.session_id = session_id self.vero_home = vero_home + self.split_accesses = split_accesses @classmethod def from_session(cls, session) -> EvaluationEngine: @@ -74,6 +82,7 @@ def from_session(cls, session) -> EvaluationEngine: run_constraints=session.evaluation_parameters, session_id=session.session_id, vero_home=session.vero_home, + split_accesses=session.split_accesses, ) # ------------------------------------------------------------------ @@ -140,10 +149,19 @@ def resolve_samples(self, req: EvalRequest) -> tuple[list[int] | None, int]: async def evaluate(self, req: EvalRequest, *, admin: bool = False) -> Experiment: """Meter (unless admin) and run one evaluation; return the full Experiment. - ``no_access`` gating is implicit: those splits are absent from the budget - ledger, so ``reserve`` raises ``InvalidSplitError`` for the agent; admin - bypasses the ledger and may evaluate anything. + ``no_access`` gating is EXPLICIT and fail-closed: when ``split_accesses`` + is configured, the split's tier is resolved (an unlisted split defaults + to ``no_access``) and a ``no_access`` split is hard-rejected for non-admin + callers *before* the budget ledger is consulted. Admin bypasses both the + tier gate and the ledger and may evaluate anything. """ + if not admin and self.split_accesses is not None: + tier = resolve_split_access(req.split, self.split_accesses) + if tier == SplitAccessLevel.no_access: + raise InvalidSplitError( + f"Split '{req.split}' of dataset '{req.dataset_id}' is no_access " + f"and cannot be evaluated by the agent." + ) sample_ids, n = self.resolve_samples(req) if not admin: await self.budget.reserve(req.dataset_id, req.split, n) diff --git a/vero/src/vero/policy.py b/vero/src/vero/policy.py index 247dc1a..99933a8 100644 --- a/vero/src/vero/policy.py +++ b/vero/src/vero/policy.py @@ -12,7 +12,9 @@ from vero.core.constants import default_minimum_score from vero.core.dataset import ( SplitAccess, + SplitAccessLevel, default_split_accesses, + resolve_split_access, ) from vero.core.db.database import Experiment, ExperimentDatabase from vero.core.evaluation import BaseEvaluationParameters @@ -340,6 +342,7 @@ async def init(self) -> None: # Split budget self._build_split_budget() + self._ensure_budgeted_splits_tiered() self._validate_budget_splits() self.session.budget = self.budget @@ -503,8 +506,37 @@ def _build_split_budget(self) -> None: ) self.budget = budgets + def _ensure_budgeted_splits_tiered(self) -> None: + """Ensure every agent-budgeted split has an explicit access tier. + + A budgeted split is one the agent may evaluate, so it must be + agent-evaluable with per-sample labels hidden, i.e. ``non_viewable``. + Any budgeted split missing from ``split_accesses`` is auto-tiered + ``non_viewable`` here, so a budget is never silently unusable + (``no_access`` by omission, which the engine rejects) nor a label leak + (``viewable`` by omission). Explicit author tiers are left untouched and + checked in :meth:`_validate_budget_splits`. + """ + if not self.budget: + return + listed = {sa.split for sa in self.split_accesses} + for b in self.budget: + if b.split not in listed: + self.split_accesses.append(SplitAccess.non_viewable(b.split)) + listed.add(b.split) + logger.info( + f"Budgeted split '{b.split}' was not listed in " + f"split_accesses; defaulting it to non_viewable " + f"(agent-evaluable, per-sample labels hidden)." + ) + def _validate_budget_splits(self) -> None: - """Warn if any budget splits don't exist in the dataset.""" + """Warn if any budget splits do not exist in the dataset, and verify + that every agent-budgeted split is tiered ``non_viewable``. + + ``viewable`` would leak per-sample labels to the agent; ``no_access`` + would make the budget unusable. Both are misconfigurations and raise. + """ if not self.budget: return from vero.core.dataset.store import load_dataset @@ -525,6 +557,15 @@ def _validate_budget_splits(self) -> None: except Exception: pass + tier = resolve_split_access(b.split, self.split_accesses) + if tier != SplitAccessLevel.non_viewable: + raise ValueError( + f"Agent-budgeted split '{b.split}' is tiered '{tier}', but a " + f"budgeted split must be 'non_viewable' (agent-evaluable with " + f"per-sample labels hidden). 'viewable' would leak labels and " + f"'no_access' would make the budget unusable." + ) + def _maybe_make_db(self) -> ExperimentDatabase: """Create or reconstruct the experiment database. diff --git a/vero/tests/test_budget.py b/vero/tests/test_budget.py index c0d5047..52e5559 100644 --- a/vero/tests/test_budget.py +++ b/vero/tests/test_budget.py @@ -92,3 +92,76 @@ def test_no_flush_when_in_memory(self, tmp_path): led = _ledger() # persist_path=None led.record("ds1", "dev", 25) # no file written, no error assert not list(tmp_path.iterdir()) + + +class TestReserveFlushOffLoop: + @pytest.mark.asyncio + async def test_reserve_flushes_durably(self, tmp_path): + path = tmp_path / "ledger.json" + led = _ledger(persist_path=path) + await led.reserve("ds1", "dev", 25) + data = json.loads(path.read_text()) + entry = next(e for e in data if e["split"] == "dev") + assert entry["remaining_sample_budget"] == 75 + assert entry["remaining_run_budget"] == 2 + + @pytest.mark.asyncio + async def test_reserve_flush_goes_through_to_thread(self, tmp_path, monkeypatch): + import asyncio as _asyncio + + path = tmp_path / "ledger.json" + led = _ledger(persist_path=path) + seen = [] + real_to_thread = _asyncio.to_thread + + async def _spy(func, *args, **kwargs): + seen.append(func) + return await real_to_thread(func, *args, **kwargs) + + monkeypatch.setattr("vero.core.budget.asyncio.to_thread", _spy) + await led.reserve("ds1", "dev", 10) + # the durable flush ran off the event loop + assert led._flush in seen + # and it actually persisted + data = json.loads(path.read_text()) + entry = next(e for e in data if e["split"] == "dev") + assert entry["remaining_sample_budget"] == 90 + + @pytest.mark.asyncio + async def test_reserve_no_to_thread_when_in_memory(self, monkeypatch): + import asyncio as _asyncio + + led = _ledger() # persist_path=None + called = [] + real_to_thread = _asyncio.to_thread + + async def _spy(func, *args, **kwargs): + called.append(func) + return await real_to_thread(func, *args, **kwargs) + + monkeypatch.setattr("vero.core.budget.asyncio.to_thread", _spy) + await led.reserve("ds1", "dev", 10) + assert called == [] # no flush path when in-memory + assert led.get("ds1", "dev").remaining_sample_budget == 90 + + @pytest.mark.asyncio + async def test_concurrent_reserves_do_not_overspend(self, tmp_path): + import asyncio as _asyncio + + path = tmp_path / "ledger.json" + # run budget of 3: only 3 of 10 concurrent reserves may succeed + led = BudgetLedger( + [SplitBudget(split="dev", dataset_id="ds1", total_run_budget=3)], + persist_path=path, + ) + + async def _try(): + try: + await led.reserve("ds1", "dev", 0) + return True + except ExperimentBudgetExceeded: + return False + + results = await _asyncio.gather(*[_try() for _ in range(10)]) + assert sum(results) == 3 + assert led.get("ds1", "dev").remaining_run_budget == 0 diff --git a/vero/tests/test_dataset_access.py b/vero/tests/test_dataset_access.py new file mode 100644 index 0000000..a03c9a7 --- /dev/null +++ b/vero/tests/test_dataset_access.py @@ -0,0 +1,23 @@ +"""Tests for resolve_split_access (vero.core.dataset.base): fail-closed tier resolution.""" + +from vero.core.dataset import SplitAccess, SplitAccessLevel, resolve_split_access + + +def test_resolve_listed_tiers(): + accesses = [ + SplitAccess.viewable("train"), + SplitAccess.non_viewable("validation"), + SplitAccess.no_access("test"), + ] + assert resolve_split_access("train", accesses) == SplitAccessLevel.viewable + assert resolve_split_access("validation", accesses) == SplitAccessLevel.non_viewable + assert resolve_split_access("test", accesses) == SplitAccessLevel.no_access + + +def test_resolve_unlisted_split_fails_closed(): + accesses = [SplitAccess.viewable("train")] + assert resolve_split_access("holdout", accesses) == SplitAccessLevel.no_access + + +def test_resolve_empty_accesses_fails_closed(): + assert resolve_split_access("anything", []) == SplitAccessLevel.no_access diff --git a/vero/tests/test_engine.py b/vero/tests/test_engine.py index 789927b..0349adf 100644 --- a/vero/tests/test_engine.py +++ b/vero/tests/test_engine.py @@ -95,3 +95,70 @@ async def test_admin_bypasses_budget(self, monkeypatch): svc.evaluator.evaluate.assert_awaited_once() # nothing metered assert svc.status()[("dev", "ds1")].remaining_run_budget == 3 + + +class TestNoAccessGate: + @pytest.mark.asyncio + async def test_evaluate_no_access_split_rejected_before_ledger(self, monkeypatch): + from vero.core.dataset import SplitAccess + + # Pathological-but-instructive: a no_access split that DOES have a ledger + # entry. Without the explicit tier gate the implicit ledger check would + # let it through. With the gate it must be rejected before reserve(). + svc = _make_service( + budgets=[ + SplitBudget(split="test", dataset_id="ds1", total_sample_budget=100, total_run_budget=3) + ], + monkeypatch=monkeypatch, + ) + svc.split_accesses = [SplitAccess.no_access("test")] + with pytest.raises(InvalidSplitError): + await svc.evaluate( + EvalRequest(dataset_id="ds1", split="test", commit="c1", num_samples=10) + ) + svc.evaluator.evaluate.assert_not_awaited() + # nothing metered: tier gate fired before the ledger + assert svc.status()[("test", "ds1")].remaining_run_budget == 3 + assert svc.status()[("test", "ds1")].remaining_sample_budget == 100 + + @pytest.mark.asyncio + async def test_evaluate_unlisted_split_defaults_no_access(self, monkeypatch): + from vero.core.dataset import SplitAccess + + svc = _make_service( + budgets=[ + SplitBudget(split="test", dataset_id="ds1", total_sample_budget=100, total_run_budget=3) + ], + monkeypatch=monkeypatch, + ) + # 'test' is not listed in split_accesses at all -> fail closed to no_access + svc.split_accesses = [SplitAccess.viewable("dev")] + with pytest.raises(InvalidSplitError): + await svc.evaluate( + EvalRequest(dataset_id="ds1", split="test", commit="c1", num_samples=10) + ) + svc.evaluator.evaluate.assert_not_awaited() + + @pytest.mark.asyncio + async def test_admin_bypasses_no_access_gate(self, monkeypatch): + from vero.core.dataset import SplitAccess + + svc = _make_service(monkeypatch=monkeypatch) + svc.split_accesses = [SplitAccess.no_access("test")] + await svc.evaluate( + EvalRequest(dataset_id="ds1", split="test", commit="c1", num_samples=10), + admin=True, + ) + svc.evaluator.evaluate.assert_awaited_once() + + @pytest.mark.asyncio + async def test_non_viewable_split_still_evaluable(self, monkeypatch): + from vero.core.dataset import SplitAccess + + svc = _make_service(monkeypatch=monkeypatch) # 'dev' budget present + svc.split_accesses = [SplitAccess.non_viewable("dev")] + await svc.evaluate( + EvalRequest(dataset_id="ds1", split="dev", commit="c1", num_samples=10) + ) + svc.evaluator.evaluate.assert_awaited_once() + assert svc.status()[("dev", "ds1")].remaining_run_budget == 2 diff --git a/vero/tests/test_policy_budget_splits.py b/vero/tests/test_policy_budget_splits.py new file mode 100644 index 0000000..b5c6826 --- /dev/null +++ b/vero/tests/test_policy_budget_splits.py @@ -0,0 +1,70 @@ +"""Tests for Policy budget/split-access reconciliation (vero.policy).""" + +import pytest + +from vero.core.budget import SplitBudget +from vero.core.dataset import ( + SplitAccess, + SplitAccessLevel, + default_split_accesses, + resolve_split_access, +) +from vero.policy import Policy + + +class _StubSession: + dataset_id = "ds1" + + +def _policy(split_accesses, budget): + # Construct without __init__/initialize; set only what the two methods read. + # sessions_dir/dataset_cache/session_id are read-only properties; we never + # set them. _validate_budget_splits reads them only inside a try/except + # (load_dataset), so the resulting AttributeError is swallowed and the tier + # check still runs. + p = Policy.__new__(Policy) + p.budget = budget + p.split_accesses = list(split_accesses) + p.session = _StubSession() + return p + + +def test_ensure_auto_tiers_unlisted_budgeted_split_non_viewable(): + p = _policy([], [SplitBudget(split="train", dataset_id="ds1", total_run_budget=5)]) + p._ensure_budgeted_splits_tiered() + assert resolve_split_access("train", p.split_accesses) == SplitAccessLevel.non_viewable + + +def test_ensure_then_validate_passes_for_train_budget_default(): + p = _policy( + list(default_split_accesses), + [SplitBudget(split="train", dataset_id="ds1", total_run_budget=5)], + ) + p._ensure_budgeted_splits_tiered() + p._validate_budget_splits() # no raise + + +def test_validate_rejects_explicit_viewable_budgeted_split(): + p = _policy( + [SplitAccess.viewable("dev")], + [SplitBudget(split="dev", dataset_id="ds1", total_sample_budget=10)], + ) + with pytest.raises(ValueError, match="non_viewable"): + p._validate_budget_splits() + + +def test_validate_rejects_explicit_no_access_budgeted_split(): + p = _policy( + [SplitAccess.no_access("dev")], + [SplitBudget(split="dev", dataset_id="ds1", total_sample_budget=10)], + ) + with pytest.raises(ValueError, match="non_viewable"): + p._validate_budget_splits() + + +def test_validate_accepts_non_viewable_budgeted_split(): + p = _policy( + [SplitAccess.non_viewable("dev")], + [SplitBudget(split="dev", dataset_id="ds1", total_sample_budget=10)], + ) + p._validate_budget_splits() # no raise