feat(run-ops): run-store routing seam + run-engine read seams#4116
feat(run-ops): run-store routing seam + run-engine read seams#4116d-cs wants to merge 16 commits into
Conversation
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughChangesThis PR introduces a run-ops database split abstraction across RunEngine: a Sequence Diagram(s)See embedded diagrams within layer sections above (ControlPlaneResolver resolution and completeWaitpoint cross-seam routing). Related PRs: None identified. Suggested labels: run-engine, database, tests, refactor Suggested reviewers: matt-aitken, ericallam 🐰 A rabbit hops through seams of code, 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
7e5633b to
d331802
Compare
88d1290 to
d5610a9
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal-packages/run-engine/src/engine/index.ts (1)
2194-2212: 🩺 Stability & Availability | 🟡 Minor | ⚡ Quick winSingle catch-all in
quit()can leak resources without logging.All shutdown steps share one try/catch; an early failure (e.g.
runQueue.quit()) aborts the rest of the chain —runLockRedis.quit(),batchQueue.close(), anddebounceSystem.quit()(the two new additions here) never run — and the error is silently discarded (no log). This risks leaked Redis connections and hides real shutdown failures, which matters givenengine.quit()runs infinallyblocks across the entire test suite.♻️ Proposed fix: isolate each shutdown step
async quit() { - try { - this.workerQueueObserverAbortController?.abort(); - - await this.runQueue.quit(); - await this.worker.stop(); - await this.ttlWorker.stop(); - await this.runLock.quit(); - - // This is just a failsafe - await this.runLockRedis.quit(); - - await this.batchQueue.close(); - - await this.debounceSystem.quit(); - } catch (error) { - // Best-effort shutdown; ignore quit/close errors. - } + this.workerQueueObserverAbortController?.abort(); + + const results = await Promise.allSettled([ + this.runQueue.quit(), + this.worker.stop(), + this.ttlWorker.stop(), + this.runLock.quit(), + this.runLockRedis.quit(), + this.batchQueue.close(), + this.debounceSystem.quit(), + ]); + + for (const result of results) { + if (result.status === "rejected") { + this.logger.error("RunEngine.quit(): a resource failed to close", { + error: result.reason, + }); + } + } }
🧹 Nitpick comments (5)
apps/webapp/app/presenters/v3/ApiBatchResultsPresenter.server.ts (1)
13-28: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winStore singleton coupling regresses presenter testability.
The lookup now goes entirely through the
runStoremodule singleton instead of the injectedthis._prisma.this._prismais forwarded only as an ignored hint in split-router mode (perrunOpsStore.findBatchTaskRunByFriendlyId, which explicitly never forwards the caller's client). This means the presenter's core lookup logic can no longer be exercised by simply constructingnew ApiBatchResultsPresenter(fakePrismaClient)— as evidenced by the new split test needingvi.mock("~/v3/runStore.server", ...)andvi.mock("~/db.server", ...)to test this path at all.Consider accepting
runStoreas an optional constructor parameter (defaulting to the singleton import), which would let tests inject a fake/real store directly without module-level mocking, aligning with the "avoid mocks... use testcontainers" test guideline for the coverage this change necessitates.♻️ Sketch of injectable store
export class ApiBatchResultsPresenter extends BasePresenter { + constructor(private readonly store: RunStore = runStore) { + super(); + } + public async call( friendlyId: string, env: AuthenticatedEnvironment ): Promise<BatchTaskRunExecutionResult | undefined> { return this.traceWithEnv("call", env, async (span) => { - const batchRun = await runStore.findBatchTaskRunByFriendlyId( + const batchRun = await this.store.findBatchTaskRunByFriendlyId( friendlyId, env.id,As per coding guidelines, "Avoid mocks or stubs in tests; when Redis or Postgres are needed, use the helpers from
@internal/testcontainers."Source: Coding guidelines
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (1)
397-482: 🚀 Performance & Scalability | 🔵 Trivial | ⚡ Quick winConsider moving the Redis
worker.ackout of the DB transaction.
this.$.worker.ack(\expireRun:${taskRun.id}`)(Line 477) is an external Redis call executed insiderunStore.runInTransaction`. This holds the run-ops DB transaction (and its row locks) open across a Redis round-trip, and the ack cannot be rolled back if the surrounding transaction later fails to commit. Since the ack only needs to happen once the attempt-start is durable, it reads more safely after a successful commit.♻️ Suggested restructure
- return { updatedRun: run, snapshot: newSnapshot }; + return { updatedRun: run, snapshot: newSnapshot, hadTtl: !!taskRun.ttl }; }) ); ... const { updatedRun, snapshot } = result; + + if (taskRun.ttl) { + //don't expire the run, it's going to execute + await this.$.worker.ack(`expireRun:${taskRun.id}`); + }internal-packages/run-engine/src/engine/tests/blockEdgeResidency.test.ts (1)
36-198: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winExtract shared run-ops test fixtures.
seedControlPlaneEnv,buildCreateRunInput,baseEngineOptions, andmakeRouterare duplicated almost verbatim incompleteWaitpointReadResidency.test.tsanddatetimeWaitpointColocation.test.ts(and largely overlap withseedExecutingKsuidParent's pattern too). Consider extracting these into a shared helper module (e.g.internal-packages/run-engine/src/engine/tests/runOpsTestFixtures.ts) to avoid drift as the routing seam evolves.♻️ Sketch of shared fixture extraction
// internal-packages/run-engine/src/engine/tests/runOpsTestFixtures.ts export function baseEngineOptions(redisOptions: any, prisma: any) { /* ... */ } export async function seedControlPlaneEnv(prisma: PrismaClient, suffix: string) { /* ... */ } export function buildCreateRunInput(params: { /* ... */ }): CreateRunInput { /* ... */ } export function makeRouter(prisma14: PrismaClient, prisma17: RunOpsPrismaClient) { /* ... */ }Then import from each two-DB test file instead of redefining.
internal-packages/run-engine/src/engine/tests/lifecycleRouter.test.ts (1)
13-117: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winShared CountingRunStore/engine-options boilerplate with
triggerCreateRouting.test.ts.
baseEngineOptions,baseTriggerParams, and theCountingRunStoresubclass largely overlap with the same-purpose definitions intriggerCreateRouting.test.ts(differing only in which methods are overridden). Consider consolidating into a shared counting-store test helper that both files import and extend/compose.internal-packages/run-store/src/PostgresRunStore.batchProbeReadClient.test.ts (1)
30-93: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winDuplicated test fixture helpers across files.
seedEnvironment,makeDedicatedStore,makeLegacyStore, andmakeSplitRouterare duplicated near-verbatim inPostgresRunStore.dedicatedRepro.test.ts(and similar helpers inPostgresRunStore.dedicatedSelect.test.ts). Consider extracting these into a shared test-fixture module (e.g.,internal-packages/run-store/src/testFixtures.ts) to avoid drift if the store/router constructor signatures change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: ee2372e3-d4a0-48ea-b6a5-54e062ce6cec
📒 Files selected for processing (71)
apps/webapp/app/presenters/v3/ApiBatchResultsPresenter.server.tsapps/webapp/test/presenters/ApiBatchResultsPresenter.split.test.tsapps/webapp/test/updateMetadataStoreRoutingHetero.test.tsinternal-packages/run-engine/src/engine/controlPlaneResolver.tsinternal-packages/run-engine/src/engine/errors.tsinternal-packages/run-engine/src/engine/index.tsinternal-packages/run-engine/src/engine/systems/batchSystem.test.tsinternal-packages/run-engine/src/engine/systems/batchSystem.tsinternal-packages/run-engine/src/engine/systems/checkpointSystem.tsinternal-packages/run-engine/src/engine/systems/debounceSystem.test.tsinternal-packages/run-engine/src/engine/systems/debounceSystem.tsinternal-packages/run-engine/src/engine/systems/delayedRunSystem.test.tsinternal-packages/run-engine/src/engine/systems/delayedRunSystem.tsinternal-packages/run-engine/src/engine/systems/dequeueSystem.tsinternal-packages/run-engine/src/engine/systems/enqueueSystem.test.tsinternal-packages/run-engine/src/engine/systems/enqueueSystem.tsinternal-packages/run-engine/src/engine/systems/executionSnapshotSystem.test.tsinternal-packages/run-engine/src/engine/systems/executionSnapshotSystem.tsinternal-packages/run-engine/src/engine/systems/pendingVersionSystem.test.tsinternal-packages/run-engine/src/engine/systems/pendingVersionSystem.tsinternal-packages/run-engine/src/engine/systems/runAttemptSystem.test.tsinternal-packages/run-engine/src/engine/systems/runAttemptSystem.tsinternal-packages/run-engine/src/engine/systems/systems.tsinternal-packages/run-engine/src/engine/systems/ttlSystem.test.tsinternal-packages/run-engine/src/engine/systems/ttlSystem.tsinternal-packages/run-engine/src/engine/systems/waitpointSystem.test.tsinternal-packages/run-engine/src/engine/systems/waitpointSystem.tsinternal-packages/run-engine/src/engine/tests/blockEdgeResidency.test.tsinternal-packages/run-engine/src/engine/tests/checkpointSystem.controlPlaneResolver.test.tsinternal-packages/run-engine/src/engine/tests/checkpointSystemStore.test.tsinternal-packages/run-engine/src/engine/tests/completeWaitpointCrossSeamGuard.test.tsinternal-packages/run-engine/src/engine/tests/completeWaitpointReadResidency.test.tsinternal-packages/run-engine/src/engine/tests/controlPlaneResolverInjectability.test.tsinternal-packages/run-engine/src/engine/tests/datetimeWaitpointColocation.test.tsinternal-packages/run-engine/src/engine/tests/debounce.test.tsinternal-packages/run-engine/src/engine/tests/delayedRunSystem.controlPlaneResolver.test.tsinternal-packages/run-engine/src/engine/tests/dequeueSystem.controlPlaneResolver.test.tsinternal-packages/run-engine/src/engine/tests/dequeueSystem.recovery.controlPlaneResolver.test.tsinternal-packages/run-engine/src/engine/tests/engineResidualInversions.controlPlaneResolver.test.tsinternal-packages/run-engine/src/engine/tests/lifecycleRouter.test.tsinternal-packages/run-engine/src/engine/tests/runAttemptSystem.controlPlaneResolver.test.tsinternal-packages/run-engine/src/engine/tests/runStoreInjectability.test.tsinternal-packages/run-engine/src/engine/tests/triggerCreateRouting.test.tsinternal-packages/run-engine/src/engine/tests/ttl.test.tsinternal-packages/run-engine/src/engine/tests/waitpointPublicRouter.test.tsinternal-packages/run-engine/src/engine/tests/waitpointSystem.controlPlaneResolver.test.tsinternal-packages/run-engine/src/engine/types.tsinternal-packages/run-engine/src/index.tsinternal-packages/run-store/package.jsoninternal-packages/run-store/src/NoopRunStore.tsinternal-packages/run-store/src/PostgresRunStore.batchProbeReadClient.test.tsinternal-packages/run-store/src/PostgresRunStore.crossGenerationError.test.tsinternal-packages/run-store/src/PostgresRunStore.dedicatedRepro.test.tsinternal-packages/run-store/src/PostgresRunStore.dedicatedSelect.test.tsinternal-packages/run-store/src/PostgresRunStore.dualSchema.test.tsinternal-packages/run-store/src/PostgresRunStore.test.tsinternal-packages/run-store/src/PostgresRunStore.tsinternal-packages/run-store/src/PostgresRunStore.writeAtomicity.test.tsinternal-packages/run-store/src/batchCompletionResidency.test.tsinternal-packages/run-store/src/clientCompat.test.tsinternal-packages/run-store/src/index.tsinternal-packages/run-store/src/runOpsStore.flipWindowDuplicate.test.tsinternal-packages/run-store/src/runOpsStore.forWaitpointCompletion.test.tsinternal-packages/run-store/src/runOpsStore.idempotencyDedup.test.tsinternal-packages/run-store/src/runOpsStore.mixedResidency.test.tsinternal-packages/run-store/src/runOpsStore.readAfterWrite.test.tsinternal-packages/run-store/src/runOpsStore.snapshots.test.tsinternal-packages/run-store/src/runOpsStore.test.tsinternal-packages/run-store/src/runOpsStore.tsinternal-packages/run-store/src/runOpsStore.waitpoints.test.tsinternal-packages/run-store/src/types.ts
💤 Files with no reviewable changes (1)
- internal-packages/run-store/src/NoopRunStore.ts
|
On the out-of-diff |
@trigger.dev/build
trigger.dev
@trigger.dev/core
@trigger.dev/python
@trigger.dev/react-hooks
@trigger.dev/redis-worker
@trigger.dev/rsc
@trigger.dev/schema-to-json
@trigger.dev/sdk
commit: |
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…uting tests The process-global mint flag was removed from friendlyId; drive NEW-residency ids via generateKsuidId() directly instead of the removed setKsuidMintEnabled(true).
…em test comments Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Migration/drain is deferred; residency is decided purely by id-shape, so the live-migration marker/fence machinery is unused. Removes redirect_marker DB usage.
… describe current enqueue system Strip development-process scaffolding labels (Test A/B, Group A, GAP A/B, Step N) from run-engine test comments and describe/containerTest title strings, keeping the meaningful behavioral descriptions. Also reword the enqueueSystem store JSDoc to describe current behavior instead of a historical delta. Comments and title strings only; no test logic touched. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… drop test enumeration labels Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… comment Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Aligns stripGroupARelations / #hydrateGroupARelations / *_GROUP_A with the already-renamed DedicatedRelationHydrator / DedicatedRelationSpec naming. Identifier-only, no behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…er, not its lagging replica RoutingRunStore.findRun/findRunOrThrow discarded the caller's client and always routed reads to the owning store's replica. Read-after-write callers (api.v1.sessions, api.v1.tasks.$taskId.trigger, sessions.end-and-continue) deliberately pass the control-plane WRITER to read back a just-committed run and beat replica lag. Routed to the lagging replica the read returned null → the run was "not found" → HTTP 500 (intermittent, hottest on the trigger read-back). Key on the passed client's IDENTITY instead of dropping it: a WRITER (exposes $transaction; a read replica does not) signals read-your-writes, so route to the OWNING store's own primary (writer) for BOTH residencies — the legacy writer for a cuid id, the NEW writer for a ksuid id — WITHOUT leaking a control-plane client into a NEW-DB query (each store reads its OWN writer). A replica / nothing keeps the default (owning store's replica), preserving replica offload for plain reads. Adds findRunOnPrimary / findRunOrThrowOnPrimary to PostgresRunStore + the RunStore interface (mirrors the existing findWaitpointOnPrimary primitive); no caller signature changes. Regression test (heteroRunOpsPostgresTest, never mocked) simulates replica lag with a recording proxy and proves read-after-write resolves via the writer for both LEGACY (cuid) and NEW (ksuid) residency, asserts the writer (not replica) was hit, and that plain reads still route to the replica. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… control-plane class at the store write boundary The store can be backed by either the control-plane @trigger.dev/database client or the @internal/run-ops-database client. These are separately generated clients, each with its own copy of the Prisma runtime, so each has its own PrismaClientKnownRequestError class object (identical code, distinct module identity). A P2002 raised by the run-ops client is therefore not `instanceof` the control-plane class, so the webapp's uniform `error instanceof Prisma.PrismaClientKnownRequestError` P2002->422 conversion is skipped and a raw 500 escapes (e.g. idempotent batch re-create returned 500 not 422). PostgresRunStore now wraps its writer/replica clients so any foreign known-request-error is re-thrown as the control-plane Prisma.PrismaClientKnownRequestError (preserving code/message/meta/clientVersion), including inside runInTransaction. This immunizes the whole class of routed-write error catches regardless of which generated client backs the store. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…esenter The batch-results presenter read the batch row via `this._prisma.batchTaskRun.findFirst`, which only hits the control-plane DB. Under the run-ops split a batch (and its items) can live in the NEW run-ops DB, so a bare-constructed presenter (as the results route builds it) missed a NEW-resident batch and returned 404. Route the batch-row lookup through `runStore.findBatchTaskRunByFriendlyId`, whose router probes NEW then LEGACY (dropping the passed client hint under the split, while the non-split single-store path keeps reading the injected client). Mirrors ApiRunResultPresenter's use of `runStore.findRun`. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ency Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…v resolution is unavailable Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
5021523 to
857e7d6
Compare
460477f to
1af2bab
Compare
…nv is null The review fix (c9237f5) took env resolution off the expireRun path: org/ project/environment identity now comes from the run's latest execution snapshot, so the run is fully expired (message acked, waitpoint completed to unblock any parent, runExpired emitted) even when the control-plane resolver returns null. The old test still encoded the pre-fix contract (no event, no ack) and asserted expiredEvents.length === 0. Update it to assert the graceful-degradation contract the fix intends. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
What
Introduces the run-store routing seam and the run-engine read seams that let run lifecycle operations be dispatched to either the control-plane database or a separately-generated run-ops database, depending on where a run/batch resides.
internal-packages/run-store): addsrunOpsStore.tsand substantially expandsPostgresRunStore.tsso the store can resolve residency and route reads/writes to the correct backing client.types.tsgrows the routing/residency types;NoopRunStore.tsis removed.internal-packages/run-engine): addsengine/controlPlaneResolver.tsand routes the per-system read paths (dequeue, enqueue, waitpoint, checkpoint, run-attempt, ttl, delayed-run, execution-snapshot, pending-version, debounce, batch) through the resolver/store instead of talking to a single Prisma client directly.engine/errors.ts,engine/types.ts, andengine/index.tsare extended to support injecting the store/resolver.Three fixes are included on top of the seam work:
c6cadd85f— routes read-your-writes to the owning store's writer, not its lagging replica, so an operation immediately reading back what it just wrote sees a consistent result.05c912e05— normalizes run-ops-generation Prisma errors to the control-plane error class at the store write boundary, soinstanceofchecks and theP2002→ 422 handling continue to work across the separately-generated run-ops Prisma client.88d12907f— resolves NEW-resident batches inApiBatchResultsPresenterby routing the batch read through the store, so a dedicated-DB batch resolves instead of returning 404.The change is heavily test-first: the bulk of the diff is new unit/integration coverage for the store routing, residency, and each run-engine system's control-plane resolver path.
Why
PR4 of the run-ops split stack (PR1–PR3 land the ClickHouse test-container and earlier plumbing). This PR is the read-path foundation: it adds the seam and read-routing but leaves the write path to route through the same seam in a later PR. Behavior-changing where the three fixes above touch existing read-your-writes / error-normalization / batch-resolution paths; otherwise additive (new store module, new resolver, injectable dependencies with existing single-client behavior preserved when no dedicated store is configured).
Tests
Extensive new vitest coverage under
run-store/src/*.test.ts(routing, residency, dual-schema select, cross-generation error normalization, read-after-write, idempotency dedup, mixed residency, waitpoint co-location) andrun-engine/src/engine/**/*.test.ts(per-systemcontrolPlaneResolvertests, injectability, block-edge residency, waitpoint read residency, trigger-create routing, lifecycle router). Testcontainers-backed; no mocks.Notes
Draft, stacked on #4114 (
runops/pr03-clickhouse-tc). Review that first; this diff is against it.Server-change / changeset note to be added at stack-assembly time.
🤖 Generated with Claude Code