-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat(run-ops): read presenters — de-join control-plane relations + read-through hydration #4122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
d-cs
wants to merge
12
commits into
runops/pr07-replication
Choose a base branch
from
runops/pr08-presenters
base: runops/pr07-replication
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
17b96c2
feat(run-ops): read presenters — de-join control-plane relations + re…
d-cs 4d5e0e1
fix(run-ops split): green the run-ops read presenter tests
d-cs a481939
refactor(run-ops): drop known-migrated from read presenters; id-shape…
d-cs 160fe71
chore(run-ops split): strip review-scaffolding comments/labels from p…
d-cs c7fae23
style(run-ops): apply oxfmt
d-cs 7d4dd53
fix(run-ops split): drop dead connectedRuns relation from ApiWaitpoin…
d-cs a7d03c8
test(run-ops): reconcile ApiBatchResultsPresenter split test with rea…
d-cs e57effb
fix(run-ops split): align WaitpointPresenter single-DB fallback and r…
d-cs 665b0f9
test(run-ops): stop RunPresenter read-seam test freezing on the first…
d-cs bbfa75f
test(run-ops): stub org-data-stores registry singleton + deterministi…
d-cs d57fcb3
chore: add server-changes for pr08
d-cs e33cfc4
chore(run-ops): fix lint/format for main lint rules
d-cs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
6 changes: 6 additions & 0 deletions
6
.server-changes/route-run-presenter-reads-through-run-store.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| area: webapp | ||
| type: improvement | ||
| --- | ||
|
|
||
| Route dashboard and API run/batch/waitpoint presenter reads through the run store so they can be served from a separate backing store without changing call sites. |
242 changes: 190 additions & 52 deletions
242
apps/webapp/app/presenters/v3/ApiBatchResultsPresenter.server.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,81 +1,219 @@ | ||
| import type { BatchTaskRunExecutionResult } from "@trigger.dev/core/v3"; | ||
| import { | ||
| $replica, | ||
| type PrismaClientOrTransaction, | ||
| type PrismaReplicaClient, | ||
| prisma, | ||
| } from "~/db.server"; | ||
| import type { TaskRunWithAttempts } from "~/models/taskRun.server"; | ||
| import { executionResultForTaskRun } from "~/models/taskRun.server"; | ||
| import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; | ||
| import { runStore } from "~/v3/runStore.server"; | ||
| import { readThroughRun } from "~/v3/runOpsMigration/readThrough.server"; | ||
| import { runStore as defaultRunStore } from "~/v3/runStore.server"; | ||
| import { BasePresenter } from "./basePresenter.server"; | ||
|
|
||
| /** | ||
| * Run-ops read-through wiring. All optional; absent (or `splitEnabled` falsy) collapses `call` to | ||
| * passthrough. `legacyReplica` is a READ REPLICA handle only — there is NO legacy-primary field. | ||
| */ | ||
| type ApiBatchResultsReadThroughDeps = { | ||
| splitEnabled?: boolean; | ||
| newClient?: PrismaReplicaClient; | ||
| legacyReplica?: PrismaReplicaClient; | ||
| isPastRetention?: (runId: string) => boolean; | ||
| }; | ||
|
|
||
| // The TaskRun shape `executionResultForTaskRun` consumes. Shared by both read sites. | ||
| const memberRunSelect = { | ||
| id: true, | ||
| friendlyId: true, | ||
| status: true, | ||
| taskIdentifier: true, | ||
| attempts: { | ||
| select: { | ||
| status: true, | ||
| output: true, | ||
| outputType: true, | ||
| error: true, | ||
| }, | ||
| orderBy: { | ||
| createdAt: "desc", | ||
| }, | ||
| }, | ||
| } as const; | ||
|
|
||
| /** | ||
| * Split on: the batch row + its item rows resolve new-run-ops first, then the LEGACY RUN-OPS | ||
| * READ REPLICA ONLY (never the legacy primary — there is no such handle); each member run is | ||
| * hydrated independently via readThroughRun keyed on the member runId, so a batch whose members | ||
| * span migrated + abandoned runs returns the complete reachable set (the batch-spanning-the-line | ||
| * read; the dangling-reference termination gate is a separate, adjacent unit). | ||
| * | ||
| * Split off (single-DB / self-host): one passthrough read for the batch row + a single store | ||
| * id-set hydrate for the members — no legacy read, no known-migrated probe, no second connection. | ||
| */ | ||
| export class ApiBatchResultsPresenter extends BasePresenter { | ||
| constructor( | ||
| prismaClient: PrismaClientOrTransaction = prisma, | ||
| replicaClient: PrismaClientOrTransaction = $replica, | ||
| private readonly readThrough?: ApiBatchResultsReadThroughDeps, | ||
| private readonly runStore = defaultRunStore | ||
| ) { | ||
| super(prismaClient, replicaClient); | ||
| } | ||
|
|
||
| public async call( | ||
| friendlyId: string, | ||
| env: AuthenticatedEnvironment | ||
| ): Promise<BatchTaskRunExecutionResult | undefined> { | ||
| return this.traceWithEnv("call", env, async (span) => { | ||
| // Route through the store so a NEW-resident batch resolves under the run-ops split (the | ||
| // router probes NEW→LEGACY and drops this client hint) instead of 404ing on a control-plane read. | ||
| const batchRun = await runStore.findBatchTaskRunByFriendlyId( | ||
| const splitEnabled = this.readThrough?.splitEnabled ?? false; | ||
|
|
||
| if (!splitEnabled) { | ||
| return this.#callPassthrough(friendlyId, env); | ||
| } | ||
|
|
||
| return this.#callSplit(friendlyId, env); | ||
| }); | ||
| } | ||
|
|
||
| // Passthrough: batch row off the replica, members via the single run store. No legacy read. | ||
| async #callPassthrough( | ||
| friendlyId: string, | ||
| env: AuthenticatedEnvironment | ||
| ): Promise<BatchTaskRunExecutionResult | undefined> { | ||
| const batchRun = await this._replica.batchTaskRun.findFirst({ | ||
| where: { | ||
| friendlyId, | ||
| env.id, | ||
| { | ||
| include: { | ||
| items: { | ||
| select: { | ||
| taskRunId: true, | ||
| }, | ||
| }, | ||
| runtimeEnvironmentId: env.id, | ||
| }, | ||
| include: { | ||
| items: { | ||
| select: { | ||
| taskRunId: true, | ||
| }, | ||
| }, | ||
| this._prisma | ||
| ); | ||
| }, | ||
| }); | ||
|
|
||
| if (!batchRun) { | ||
| return undefined; | ||
| } | ||
| if (!batchRun) { | ||
| return undefined; | ||
| } | ||
|
|
||
| const taskRunIds = batchRun.items.map((item) => item.taskRunId); | ||
| const taskRunIds = batchRun.items.map((item) => item.taskRunId); | ||
|
|
||
| if (taskRunIds.length === 0) { | ||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: [], | ||
| }; | ||
| } | ||
| if (taskRunIds.length === 0) { | ||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: [], | ||
| }; | ||
| } | ||
|
|
||
| const taskRuns = await runStore.findRuns( | ||
| { | ||
| where: { id: { in: taskRunIds } }, | ||
| select: { | ||
| id: true, | ||
| friendlyId: true, | ||
| status: true, | ||
| taskIdentifier: true, | ||
| attempts: { | ||
| select: { | ||
| status: true, | ||
| output: true, | ||
| outputType: true, | ||
| error: true, | ||
| }, | ||
| orderBy: { | ||
| createdAt: "desc", | ||
| }, | ||
| const taskRuns = await this.runStore.findRuns( | ||
| { | ||
| where: { id: { in: taskRunIds } }, | ||
| select: memberRunSelect, | ||
| }, | ||
| this._prisma | ||
| ); | ||
|
|
||
| const runMap = new Map(taskRuns.map((run) => [run.id, run])); | ||
|
|
||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: batchRun.items | ||
| .map((item) => { | ||
| const run = runMap.get(item.taskRunId); | ||
| return run ? executionResultForTaskRun(run as TaskRunWithAttempts) : undefined; | ||
| }) | ||
| .filter(Boolean), | ||
| }; | ||
| } | ||
|
|
||
| // Split: resolve the batch row new-first then off the legacy READ REPLICA only (a batch id may | ||
| // be cuid or ksuid, and a cuid-shaped id can still have been backfilled onto NEW, so id-shape | ||
| // residency is not authoritative for the row — the new-first-then-legacy probe is), then | ||
| // hydrate every member run independently via the per-run read-through primitive. | ||
| async #callSplit( | ||
| friendlyId: string, | ||
| env: AuthenticatedEnvironment | ||
| ): Promise<BatchTaskRunExecutionResult | undefined> { | ||
| // Resolve both handles ONCE so the batch row and its members never read from different DBs. | ||
| const newClient = (this.readThrough?.newClient ?? this._replica) as PrismaReplicaClient; | ||
| const legacyReplica = (this.readThrough?.legacyReplica ?? this._replica) as PrismaReplicaClient; | ||
|
|
||
| const readBatch = (client: PrismaClientOrTransaction) => | ||
| client.batchTaskRun.findFirst({ | ||
| where: { | ||
| friendlyId, | ||
| runtimeEnvironmentId: env.id, | ||
| }, | ||
| include: { | ||
| items: { | ||
| select: { | ||
| taskRunId: true, | ||
| }, | ||
| }, | ||
| }, | ||
| this._prisma | ||
| ); | ||
| }); | ||
|
|
||
| let batchRun = await readBatch(newClient); | ||
|
|
||
| // Legacy READ REPLICA probe, only on a new-probe miss; skipped when past retention. | ||
| if (!batchRun && !this.readThrough?.isPastRetention?.(friendlyId)) { | ||
| batchRun = await readBatch(legacyReplica); | ||
| } | ||
|
|
||
| const runMap = new Map(taskRuns.map((run) => [run.id, run])); | ||
| if (!batchRun) { | ||
| return undefined; | ||
| } | ||
|
|
||
| if (batchRun.items.length === 0) { | ||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: batchRun.items | ||
| .map((item) => { | ||
| const run = runMap.get(item.taskRunId); | ||
| return run ? executionResultForTaskRun(run as TaskRunWithAttempts) : undefined; | ||
| }) | ||
| .filter(Boolean), | ||
| items: [], | ||
| }; | ||
| }); | ||
| } | ||
|
|
||
| const readMemberRun = (client: PrismaClientOrTransaction, taskRunId: string) => | ||
| client.taskRun.findFirst({ | ||
| where: { id: taskRunId }, | ||
| select: memberRunSelect, | ||
| }) as Promise<TaskRunWithAttempts | null>; | ||
|
|
||
| // Per-member fan-out: each member may live on a different DB, so a single nested include cannot | ||
| // cross the seam. Promise.all preserves batchRun.items order, unchanged from today. | ||
| const memberResults = await Promise.all( | ||
| batchRun.items.map(async (item) => { | ||
| const result = await readThroughRun<TaskRunWithAttempts>({ | ||
| runId: item.taskRunId, | ||
| environmentId: env.id, | ||
| readNew: (client) => readMemberRun(client, item.taskRunId), | ||
| readLegacy: (replica) => readMemberRun(replica, item.taskRunId), | ||
| deps: { | ||
| splitEnabled: true, | ||
| // Pass the SAME resolved handles the batch row used, so the batch row and its members | ||
| // never resolve against different DBs. (Letting these fall through to readThroughRun's | ||
| // own module-level defaults would diverge from the batch read's `?? this._replica`.) | ||
| newClient, | ||
| legacyReplica, | ||
| isPastRetention: this.readThrough?.isPastRetention, | ||
| }, | ||
| }); | ||
|
|
||
| // not-found / past-retention members are omitted (matches today's drop-undefined behavior); | ||
| // the dangling-reference termination gate (separate unit) governs whether that's permitted. | ||
| if (result.source === "not-found" || result.source === "past-retention") { | ||
| return undefined; | ||
| } | ||
|
|
||
| return executionResultForTaskRun(result.value); | ||
| }) | ||
| ); | ||
|
|
||
| return { | ||
| id: batchRun.friendlyId, | ||
| items: memberResults.filter(Boolean), | ||
| }; | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.