From d91b1923f28a6cd18e888397e482bda5403a39c2 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 1 Jul 2026 16:20:02 +0100 Subject: [PATCH 1/6] =?UTF-8?q?feat(run-ops):=20activation=20=E2=80=94=20d?= =?UTF-8?q?rop=20cross-DB=20Prisma=20FKs,=20provision=20run-ops=20DB,=20en?= =?UTF-8?q?able=20split?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 (1M context) --- .../batchTaskRunEnvironmentFkDrop.test.ts | 169 +++++ .../test/dropTaskRunToTaskRunTagJoin.test.ts | 66 ++ .../runOpsCascadeCleanup.server.test.ts | 665 ++++++++++++++++++ .../migration.sql | 9 + .../migration.sql | 2 + .../migration.sql | 2 + .../migration.sql | 26 + .../migration.sql | 7 + .../database/prisma/schema.prisma | 14 +- ...stgresRunStore.controlPlaneAlertFk.test.ts | 127 ++++ 10 files changed, 1075 insertions(+), 12 deletions(-) create mode 100644 apps/webapp/test/batchTaskRunEnvironmentFkDrop.test.ts create mode 100644 apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts create mode 100644 apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts create mode 100644 internal-packages/database/prisma/migrations/20260626130000_drop_task_run_to_task_run_tag_join/migration.sql create mode 100644 internal-packages/database/prisma/migrations/20260626140000_drop_bulk_action_item_group_fk/migration.sql create mode 100644 internal-packages/database/prisma/migrations/20260626150000_drop_batch_task_run_environment_fk/migration.sql create mode 100644 internal-packages/database/prisma/migrations/20260629120000_drop_run_ops_control_plane_foreign_keys/migration.sql create mode 100644 internal-packages/database/prisma/migrations/20260630120000_drop_project_alert_run_subgraph_foreign_keys/migration.sql create mode 100644 internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts diff --git a/apps/webapp/test/batchTaskRunEnvironmentFkDrop.test.ts b/apps/webapp/test/batchTaskRunEnvironmentFkDrop.test.ts new file mode 100644 index 00000000000..ba8c29a1c38 --- /dev/null +++ b/apps/webapp/test/batchTaskRunEnvironmentFkDrop.test.ts @@ -0,0 +1,169 @@ +// Proof for dropping the canonical BatchTaskRun -> RuntimeEnvironment FK +// (constraint "BatchTaskRun_runtimeEnvironmentId_fkey", onDelete: Cascade) while keeping the +// runtimeEnvironmentId scalar and its compound @@unique/@@index. BatchTaskRun is run-ops and +// RuntimeEnvironment is control-plane, so the two may live on different servers; create-time +// integrity is preserved app-side via the ControlPlaneResolver's assertEnvExists. Env-delete +// orphan cleanup is handled separately — here the batch row is tolerated. + +import { heteroPostgresTest, postgresTest } from "@internal/testcontainers"; +import type { PrismaClient } from "@trigger.dev/database"; +import { describe, expect, vi } from "vitest"; +import { ControlPlaneCache } from "~/v3/runOpsMigration/controlPlaneCache.server"; +import { + ControlPlaneReferenceError, + ControlPlaneResolver, +} from "~/v3/runOpsMigration/controlPlaneResolver.server"; + +// Cross-DB testcontainer spin-up + queries can exceed the 5s default on the first test. +vi.setConfig({ testTimeout: 60_000 }); + +let seedCounter = 0; + +async function seedEnvironment(prisma: PrismaClient) { + const n = seedCounter++; + const organization = await prisma.organization.create({ + data: { title: `Org ${n}`, slug: `org-${n}` }, + }); + const project = await prisma.project.create({ + data: { + name: `Project ${n}`, + slug: `project-${n}`, + externalRef: `proj_${n}`, + organizationId: organization.id, + }, + }); + const environment = await prisma.runtimeEnvironment.create({ + data: { + type: "PRODUCTION", + slug: `env-${n}`, + projectId: project.id, + organizationId: organization.id, + apiKey: `tr_prod_${n}`, + pkApiKey: `pk_prod_${n}`, + shortcode: `short_${n}`, + }, + }); + return { organization, project, environment }; +} + +let batchCounter = 0; + +async function createBatch(prisma: PrismaClient, runtimeEnvironmentId: string) { + const n = batchCounter++; + return prisma.batchTaskRun.create({ + data: { + friendlyId: `batch_${n}`, + runtimeEnvironmentId, + runCount: 1, + runIds: [], + batchVersion: "runengine:v2", + }, + }); +} + +// Asserts the post-migration state of BatchTaskRun on a given client: the FK is gone, but the +// scalar and both compound constraints are retained. Shared by the single-version and the +// cross-version suites. +async function assertSchemaState(prisma: PrismaClient) { + const foreignKeys = await prisma.$queryRaw<{ constraint_name: string }[]>` + SELECT constraint_name + FROM information_schema.table_constraints + WHERE table_name = 'BatchTaskRun' + AND constraint_type = 'FOREIGN KEY' + `; + expect(foreignKeys.map((c) => c.constraint_name)).not.toContain( + "BatchTaskRun_runtimeEnvironmentId_fkey" + ); + + const columns = await prisma.$queryRaw<{ column_name: string }[]>` + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'BatchTaskRun' + AND column_name = 'runtimeEnvironmentId' + `; + expect(columns).toHaveLength(1); + + // The @@unique([runtimeEnvironmentId, idempotencyKey]) and + // @@index([runtimeEnvironmentId, id(sort: Desc)]) both survive the FK drop. + const indexes = await prisma.$queryRaw<{ indexdef: string }[]>` + SELECT indexdef FROM pg_indexes WHERE tablename = 'BatchTaskRun' + `; + const defs = indexes.map((i) => i.indexdef); + const hasUnique = defs.some( + (d) => /UNIQUE/i.test(d) && d.includes("runtimeEnvironmentId") && d.includes("idempotencyKey") + ); + const hasIndex = defs.some( + (d) => !/UNIQUE/i.test(d) && d.includes("runtimeEnvironmentId") && /\bid\b/.test(d) + ); + expect(hasUnique).toBe(true); + expect(hasIndex).toBe(true); +} + +// Inserts an env + batch, deletes the env, and asserts the batch survives (cascade gone). +async function assertOrphanTolerated(prisma: PrismaClient) { + const { environment } = await seedEnvironment(prisma); + const batch = await createBatch(prisma, environment.id); + + await prisma.runtimeEnvironment.delete({ where: { id: environment.id } }); + + const survivor = await prisma.batchTaskRun.findFirst({ where: { id: batch.id } }); + expect(survivor).not.toBeNull(); + expect(survivor?.runtimeEnvironmentId).toBe(environment.id); +} + +describe("drop BatchTaskRun -> RuntimeEnvironment FK", () => { + postgresTest("FK constraint absent; scalar + unique + index retained", async ({ prisma }) => { + await assertSchemaState(prisma); + }); + + postgresTest( + "deleting the env leaves the BatchTaskRun row alive (no cascade; orphan cleanup handled separately)", + async ({ prisma }) => { + await assertOrphanTolerated(prisma); + } + ); + + postgresTest( + "app-side env validation: assertEnvExists rejects an invalid env and a valid-env create succeeds by scalar", + async ({ prisma }) => { + const { environment } = await seedEnvironment(prisma); + + const resolver = new ControlPlaneResolver({ + controlPlanePrimary: prisma, + controlPlaneReplica: prisma, + cache: new ControlPlaneCache(), + splitEnabled: () => true, + }); + + // The exact guard call the create services place before batchTaskRun.create. + await expect(resolver.assertEnvExists("env_does_not_exist")).rejects.toBeInstanceOf( + ControlPlaneReferenceError + ); + + await expect(resolver.assertEnvExists(environment.id)).resolves.toBeUndefined(); + + // Once the guard passes, the batch is linked by the runtimeEnvironmentId scalar (no FK). + const batch = await createBatch(prisma, environment.id); + expect(batch.runtimeEnvironmentId).toBe(environment.id); + } + ); +}); + +// Cross-version gate: the migration applies and the post-state is identical across major versions. +describe("drop BatchTaskRun -> RuntimeEnvironment FK — cross-version (legacy + new Postgres)", () => { + heteroPostgresTest( + "migration applies and FK is absent on both the legacy and new databases", + async ({ prisma14, prisma17 }) => { + await assertSchemaState(prisma14); + await assertSchemaState(prisma17); + } + ); + + heteroPostgresTest( + "env delete leaves the batch orphaned on both the legacy and new databases", + async ({ prisma14, prisma17 }) => { + await assertOrphanTolerated(prisma14); + await assertOrphanTolerated(prisma17); + } + ); +}); diff --git a/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts b/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts new file mode 100644 index 00000000000..e6f270aa861 --- /dev/null +++ b/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts @@ -0,0 +1,66 @@ +// Single-version proof for dropping the dead `_TaskRunToTaskRunTag` implicit join. The +// cross-version follow-up (via heteroPostgresTest) is tracked separately; those helpers +// do not exist in this tree yet. + +import { describe, expect } from "vitest"; +import { postgresTest } from "@internal/testcontainers"; + +describe("drop _TaskRunToTaskRunTag implicit join", () => { + postgresTest("runTags scalar round-trips and the join table is gone", async ({ prisma }) => { + const organization = await prisma.organization.create({ + data: { + title: "test", + slug: "test", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + const taskRun = await prisma.taskRun.create({ + data: { + friendlyId: "run_1234", + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + payloadType: "application/json", + traceId: "1234", + spanId: "1234", + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + runTags: ["alpha", "beta"], + }, + }); + + const readBack = await prisma.taskRun.findFirstOrThrow({ + where: { id: taskRun.id }, + }); + expect(readBack.runTags).toEqual(["alpha", "beta"]); + + const result = await prisma.$queryRaw<{ t: string | null }[]>` + SELECT to_regclass('public._TaskRunToTaskRunTag')::text as t + `; + expect(result[0].t).toBeNull(); + }); +}); diff --git a/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts b/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts new file mode 100644 index 00000000000..2065ae06b0a --- /dev/null +++ b/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts @@ -0,0 +1,665 @@ +import { heteroPostgresTest, heteroRunOpsPostgresTest } from "@internal/testcontainers"; +import type { PrismaClient } from "@trigger.dev/database"; +import type { RunOpsPrismaClient } from "@internal/run-ops-database"; +import { describe, expect, vi } from "vitest"; +import { RunOpsCascadeCleanupService } from "~/v3/runOpsMigration/runOpsCascadeCleanup.server"; + +// Cross-DB testcontainer spin-up + the multi-table seed can exceed the 5s default. +vi.setConfig({ testTimeout: 120_000 }); + +// Run-subgraph tables that live in BOTH the control-plane schema AND the dedicated run-ops SUBSET +// schema, so they are deleted on every run-ops writer. +const SUBGRAPH_TABLES = [ + "taskRun", + "taskRunAttempt", + "waitpoint", + "taskRunWaitpoint", + "taskRunCheckpoint", + "checkpoint", + "checkpointRestoreEvent", + "batchTaskRun", +] as const; + +// Control-plane-resident model: exists ONLY in @trigger.dev/database, NOT in the run-ops subset. +// Seeded + counted only against the control-plane writer. +const CONTROL_PLANE_TABLES = ["bulkActionItem"] as const; + +type SubgraphTable = (typeof SUBGRAPH_TABLES)[number]; +type ControlPlaneTable = (typeof CONTROL_PLANE_TABLES)[number]; + +let seedCounter = 0; + +/** + * The cross-seam (run-ops -> control-plane) Cascade FKs that the cloud DB physically drops. Applied + * to the FK-dropped fixture to model cloud; the other side keeps them to model self-host. Only the + * run-subgraph constraints exist on the dedicated run-ops schema; BulkActionItem's are control-plane + * only and are dropped separately on a full-schema client. + */ +const SUBGRAPH_CROSS_SEAM_FKS: Array<{ table: string; constraint: string }> = [ + { table: "TaskRun", constraint: "TaskRun_runtimeEnvironmentId_fkey" }, + { table: "TaskRun", constraint: "TaskRun_projectId_fkey" }, + { table: "TaskRunAttempt", constraint: "TaskRunAttempt_runtimeEnvironmentId_fkey" }, + { table: "Waitpoint", constraint: "Waitpoint_environmentId_fkey" }, + { table: "Waitpoint", constraint: "Waitpoint_projectId_fkey" }, + { table: "TaskRunWaitpoint", constraint: "TaskRunWaitpoint_projectId_fkey" }, + { table: "TaskRunCheckpoint", constraint: "TaskRunCheckpoint_runtimeEnvironmentId_fkey" }, + { table: "TaskRunCheckpoint", constraint: "TaskRunCheckpoint_projectId_fkey" }, + { table: "Checkpoint", constraint: "Checkpoint_runtimeEnvironmentId_fkey" }, + { table: "Checkpoint", constraint: "Checkpoint_projectId_fkey" }, + { + table: "CheckpointRestoreEvent", + constraint: "CheckpointRestoreEvent_runtimeEnvironmentId_fkey", + }, + { table: "CheckpointRestoreEvent", constraint: "CheckpointRestoreEvent_projectId_fkey" }, + { table: "BatchTaskRun", constraint: "BatchTaskRun_runtimeEnvironmentId_fkey" }, +]; + +const BULK_ACTION_CROSS_SEAM_FKS: Array<{ table: string; constraint: string }> = [ + { table: "BulkActionItem", constraint: "BulkActionItem_sourceRunId_fkey" }, + { table: "BulkActionItem", constraint: "BulkActionItem_destinationRunId_fkey" }, +]; + +async function dropCrossSeamFks( + prisma: { $executeRawUnsafe: (q: string) => Promise }, + fks: Array<{ table: string; constraint: string }> +) { + for (const { table, constraint } of fks) { + await prisma.$executeRawUnsafe( + `ALTER TABLE "${table}" DROP CONSTRAINT IF EXISTS "${constraint}"` + ); + } +} + +type Scope = { projectId: string; environmentId: string; organizationId: string }; +type FullScope = Scope & { workerTaskId: string; queueId: string; backgroundWorkerId: string }; + +// Minimal structural client covering the control-plane prerequisites + run-subgraph models the +// seed/count helpers touch. Both PrismaClient and RunOpsPrismaClient are assignable. +type SeedClient = { + organization: any; + project: any; + runtimeEnvironment: any; + backgroundWorker: any; + backgroundWorkerTask: any; + taskQueue: any; + taskRun: any; + taskRunAttempt: any; + waitpoint: any; + taskRunWaitpoint: any; + taskRunCheckpoint: any; + checkpoint: any; + checkpointRestoreEvent: any; + batchTaskRun: any; +}; + +// Synthetic scope for the dedicated run-ops subset client, whose schema scalarizes every +// control-plane FK so no org/project/env rows are required. +function makeSyntheticScope(): FullScope { + const n = seedCounter++; + return { + projectId: `proj_synthetic_${n}`, + environmentId: `env_synthetic_${n}`, + organizationId: `org_synthetic_${n}`, + workerTaskId: `task_synthetic_${n}`, + queueId: `queue_synthetic_${n}`, + backgroundWorkerId: `worker_synthetic_${n}`, + }; +} + +/** Create the control-plane prerequisites (org, project, env, worker, task, queue). */ +async function seedScope(prisma: SeedClient): Promise { + const n = seedCounter++; + const org = await prisma.organization.create({ + data: { title: `Org ${n}`, slug: `org-${n}` }, + }); + const project = await prisma.project.create({ + data: { + name: `Project ${n}`, + slug: `project-${n}`, + externalRef: `proj_${n}`, + organizationId: org.id, + }, + }); + const environment = await prisma.runtimeEnvironment.create({ + data: { + type: "PRODUCTION", + slug: `env-${n}`, + projectId: project.id, + organizationId: org.id, + apiKey: `tr_prod_${n}`, + pkApiKey: `pk_prod_${n}`, + shortcode: `short_${n}`, + }, + }); + const worker = await prisma.backgroundWorker.create({ + data: { + friendlyId: `worker_${n}`, + contentHash: `hash_${n}`, + projectId: project.id, + runtimeEnvironmentId: environment.id, + version: `2024.1.${n}`, + metadata: {}, + engine: "V2", + }, + }); + const task = await prisma.backgroundWorkerTask.create({ + data: { + friendlyId: `task_${n}`, + slug: `my-task-${n}`, + filePath: "index.ts", + exportName: "myTask", + workerId: worker.id, + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + const queue = await prisma.taskQueue.create({ + data: { + friendlyId: `queue_${n}`, + name: `task/my-task-${n}`, + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + return { + projectId: project.id, + environmentId: environment.id, + organizationId: org.id, + workerTaskId: task.id, + queueId: queue.id, + backgroundWorkerId: worker.id, + }; +} + +/** + * Seed one full run-ops subgraph for a scope: a TaskRun tree (root + child), an attempt, a + * Waitpoint with a blocking edge (TaskRunWaitpoint), a TaskRunCheckpoint, a Checkpoint + a + * CheckpointRestoreEvent, and a BatchTaskRun with a member run. Returns the source + destination + * runs so a caller with a control-plane client can attach a BulkActionItem. + */ +async function seedRunOpsSubgraph( + prisma: SeedClient, + scope: Scope & { backgroundWorkerId: string; workerTaskId: string; queueId: string } +): Promise<{ sourceRunId: string; destinationRunId: string }> { + const n = seedCounter++; + const { projectId, environmentId } = scope; + + const baseRun = (suffix: string) => ({ + friendlyId: `run_${n}_${suffix}`, + taskIdentifier: `my-task-${n}`, + payload: "{}", + payloadType: "application/json", + traceId: `trace_${n}_${suffix}`, + spanId: `span_${n}_${suffix}`, + queue: `task/my-task-${n}`, + runtimeEnvironmentId: environmentId, + projectId, + }); + + const rootRun = await prisma.taskRun.create({ data: baseRun("root") }); + const childRun = await prisma.taskRun.create({ + data: { ...baseRun("child"), parentTaskRunId: rootRun.id, rootTaskRunId: rootRun.id }, + }); + + const attempt = await prisma.taskRunAttempt.create({ + data: { + friendlyId: `attempt_${n}`, + taskRunId: rootRun.id, + backgroundWorkerId: scope.backgroundWorkerId, + backgroundWorkerTaskId: scope.workerTaskId, + runtimeEnvironmentId: environmentId, + queueId: scope.queueId, + }, + }); + + const waitpoint = await prisma.waitpoint.create({ + data: { + friendlyId: `wp_${n}`, + type: "MANUAL", + idempotencyKey: `wp_idem_${n}`, + userProvidedIdempotencyKey: false, + environmentId, + projectId, + }, + }); + await prisma.taskRunWaitpoint.create({ + data: { taskRunId: rootRun.id, waitpointId: waitpoint.id, projectId }, + }); + + await prisma.taskRunCheckpoint.create({ + data: { + friendlyId: `trcp_${n}`, + type: "DOCKER", + location: "loc", + runtimeEnvironmentId: environmentId, + projectId, + }, + }); + + const checkpoint = await prisma.checkpoint.create({ + data: { + friendlyId: `cp_${n}`, + type: "DOCKER", + location: "loc", + imageRef: "ref", + runId: rootRun.id, + attemptId: attempt.id, + runtimeEnvironmentId: environmentId, + projectId, + }, + }); + await prisma.checkpointRestoreEvent.create({ + data: { + type: "CHECKPOINT", + checkpointId: checkpoint.id, + runId: rootRun.id, + attemptId: attempt.id, + runtimeEnvironmentId: environmentId, + projectId, + }, + }); + + const batch = await prisma.batchTaskRun.create({ + data: { friendlyId: `batch_${n}`, runtimeEnvironmentId: environmentId }, + }); + await prisma.taskRun.update({ where: { id: childRun.id }, data: { batchId: batch.id } }); + + const sourceRun = await prisma.taskRun.create({ data: baseRun("src") }); + const destRun = await prisma.taskRun.create({ data: baseRun("dst") }); + return { sourceRunId: sourceRun.id, destinationRunId: destRun.id }; +} + +/** Attach a BulkActionItem (control-plane-resident) over the given source/destination runs. */ +async function seedBulkActionItem( + prisma: PrismaClient, + runs: { sourceRunId: string; destinationRunId: string } +): Promise { + await prisma.bulkActionItem.create({ + data: { + groupId: `grp_${seedCounter++}`, + type: "REPLAY", + sourceRunId: runs.sourceRunId, + destinationRunId: runs.destinationRunId, + }, + }); +} + +async function subgraphCountsForScope( + prisma: SeedClient, + scope: { projectId: string } +): Promise> { + const { projectId } = scope; + return { + taskRun: await prisma.taskRun.count({ where: { projectId } }), + taskRunAttempt: await prisma.taskRunAttempt.count({ where: { taskRun: { projectId } } }), + waitpoint: await prisma.waitpoint.count({ where: { projectId } }), + taskRunWaitpoint: await prisma.taskRunWaitpoint.count({ where: { projectId } }), + taskRunCheckpoint: await prisma.taskRunCheckpoint.count({ where: { projectId } }), + checkpoint: await prisma.checkpoint.count({ where: { projectId } }), + checkpointRestoreEvent: await prisma.checkpointRestoreEvent.count({ where: { projectId } }), + batchTaskRun: await prisma.batchTaskRun.count({ + where: { runs: { some: { projectId } } }, + }), + }; +} + +async function bulkActionItemCountForScope( + prisma: PrismaClient, + scope: { projectId: string } +): Promise { + return prisma.bulkActionItem.count({ where: { sourceRun: { projectId: scope.projectId } } }); +} + +function expectSubgraphAllZero(counts: Record) { + for (const table of SUBGRAPH_TABLES) { + expect(counts[table], `${table} should be empty`).toBe(0); + } +} + +function expectSubgraphAllNonZero(counts: Record) { + for (const table of SUBGRAPH_TABLES) { + expect(counts[table], `${table} should be seeded`).toBeGreaterThan(0); + } +} + +describe("RunOpsCascadeCleanupService", () => { + // REGRESSION: the NEW run-ops writer is a real RunOpsPrismaClient over the dedicated + // SUBSET schema — it has NO `bulkActionItem` delegate. Before the fix, the per-writer pass called + // `writer.bulkActionItem.deleteMany` on this client => TypeError (Cannot read properties of + // undefined). After the fix, BulkActionItem is cleaned ONLY on the control-plane writer (prisma14), + // and the run-subgraph is deleted on the NEW DB without throwing. + heteroRunOpsPostgresTest( + "cleanupProject does not throw on the dedicated RunOpsPrismaClient and clears the new DB subgraph", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma14, BULK_ACTION_CROSS_SEAM_FKS); + + // The dedicated run-ops subset schema scalarizes every control-plane FK, so the NEW DB needs + // NO org/project/env prereqs — seed the subgraph directly with synthetic scope ids. + const newScope = makeSyntheticScope(); + await seedRunOpsSubgraph(prisma17 as unknown as SeedClient, newScope); + + // BulkActionItem (control-plane-resident) lives only on the control-plane DB. + const cp = await seedScope(prisma14); + const cpRuns = await seedRunOpsSubgraph(prisma14, cp); + await seedBulkActionItem(prisma14, cpRuns); + + // prisma17 is a real RunOpsPrismaClient (subset, no bulkActionItem delegate); prisma14 is the + // control-plane writer. Before the fix this threw a TypeError on writer.bulkActionItem. + const result = await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14 as unknown as RunOpsPrismaClient], + controlPlaneWriter: prisma14, + }).cleanupProject(newScope.projectId); + + expectSubgraphAllZero( + await subgraphCountsForScope(prisma17 as unknown as SeedClient, newScope) + ); + // BulkActionItem cleanup ran against the control-plane writer and deleted the control-plane + // project's item; the subset client was never asked for the missing delegate. + expect(result.bulkActionItem).toBeGreaterThanOrEqual(0); + } + ); + + // REGRESSION (env variant): same guarantee for cleanupEnvironment. + heteroRunOpsPostgresTest( + "cleanupEnvironment does not throw on the dedicated RunOpsPrismaClient and clears the new DB subgraph", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma14, BULK_ACTION_CROSS_SEAM_FKS); + + const newScope = makeSyntheticScope(); + await seedRunOpsSubgraph(prisma17 as unknown as SeedClient, newScope); + + const cp = await seedScope(prisma14); + const cpRuns = await seedRunOpsSubgraph(prisma14, cp); + await seedBulkActionItem(prisma14, cpRuns); + + const result = await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14 as unknown as RunOpsPrismaClient], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(newScope.environmentId); + + expectSubgraphAllZero( + await subgraphCountsForScope(prisma17 as unknown as SeedClient, newScope) + ); + expect(result.bulkActionItem).toBeGreaterThanOrEqual(0); + } + ); + + // Test A: env cleanup over both writers empties the subgraph on BOTH DBs + BulkActionItem on the + // control-plane DB; a sibling scope survives. + heteroPostgresTest( + "cleanupEnvironment empties the subgraph across both writers, isolating a sibling env", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + await dropCrossSeamFks(prisma17, BULK_ACTION_CROSS_SEAM_FKS); + + const target14 = await seedScope(prisma14); + const target17 = await seedScope(prisma17); + const targetRuns14 = await seedRunOpsSubgraph(prisma14, target14); + await seedRunOpsSubgraph(prisma17, target17); + await seedBulkActionItem(prisma14, targetRuns14); + + const sibling14 = await seedScope(prisma14); + const sibling17 = await seedScope(prisma17); + const siblingRuns14 = await seedRunOpsSubgraph(prisma14, sibling14); + await seedRunOpsSubgraph(prisma17, sibling17); + await seedBulkActionItem(prisma14, siblingRuns14); + + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(target14.environmentId); + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(target17.environmentId); + + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, target14)); + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, target17)); + expect(await bulkActionItemCountForScope(prisma14, target14)).toBe(0); + expectSubgraphAllNonZero(await subgraphCountsForScope(prisma14, sibling14)); + expectSubgraphAllNonZero(await subgraphCountsForScope(prisma17, sibling17)); + expect(await bulkActionItemCountForScope(prisma14, sibling14)).toBeGreaterThan(0); + } + ); + + // Test B: project cleanup over both writers. + heteroPostgresTest( + "cleanupProject empties the subgraph across both writers, isolating a sibling project", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + await dropCrossSeamFks(prisma17, BULK_ACTION_CROSS_SEAM_FKS); + + const target14 = await seedScope(prisma14); + const target17 = await seedScope(prisma17); + const targetRuns14 = await seedRunOpsSubgraph(prisma14, target14); + await seedRunOpsSubgraph(prisma17, target17); + await seedBulkActionItem(prisma14, targetRuns14); + + const sibling14 = await seedScope(prisma14); + const sibling17 = await seedScope(prisma17); + const siblingRuns14 = await seedRunOpsSubgraph(prisma14, sibling14); + await seedRunOpsSubgraph(prisma17, sibling17); + await seedBulkActionItem(prisma14, siblingRuns14); + + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupProject(target14.projectId); + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupProject(target17.projectId); + + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, target14)); + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, target17)); + expect(await bulkActionItemCountForScope(prisma14, target14)).toBe(0); + expectSubgraphAllNonZero(await subgraphCountsForScope(prisma14, sibling14)); + expectSubgraphAllNonZero(await subgraphCountsForScope(prisma17, sibling17)); + expect(await bulkActionItemCountForScope(prisma14, sibling14)).toBeGreaterThan(0); + } + ); + + // Test C: idempotency — a second cleanup returns all-zero counts and does not throw on either DB. + heteroPostgresTest( + "cleanupEnvironment is idempotent on a re-run across both FK configs", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + await dropCrossSeamFks(prisma17, BULK_ACTION_CROSS_SEAM_FKS); + + const t14 = await seedScope(prisma14); + const t17 = await seedScope(prisma17); + const runs14 = await seedRunOpsSubgraph(prisma14, t14); + await seedRunOpsSubgraph(prisma17, t17); + await seedBulkActionItem(prisma14, runs14); + + const svc14 = new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma14], + controlPlaneWriter: prisma14, + }); + const svc17 = new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17], + controlPlaneWriter: prisma14, + }); + await svc14.cleanupEnvironment(t14.environmentId); + await svc17.cleanupEnvironment(t17.environmentId); + + const second14 = await svc14.cleanupEnvironment(t14.environmentId); + const second17 = await svc17.cleanupEnvironment(t17.environmentId); + + for (const result of [second14, second17]) { + for (const count of Object.values(result)) { + expect(count).toBe(0); + } + } + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, t14)); + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, t17)); + } + ); + + // Test D: FK-retained vs FK-dropped fixtures reach an identical run-subgraph end-state. + heteroPostgresTest( + "FK-retained and FK-dropped fixtures reach an identical end-state after cleanup", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + + const s14 = await seedScope(prisma14); + const s17 = await seedScope(prisma17); + await seedRunOpsSubgraph(prisma14, s14); + await seedRunOpsSubgraph(prisma17, s17); + + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma14], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(s14.environmentId); + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17], + controlPlaneWriter: prisma17, + }).cleanupEnvironment(s17.environmentId); + + const counts14 = await subgraphCountsForScope(prisma14, s14); + const counts17 = await subgraphCountsForScope(prisma17, s17); + expect(counts17).toEqual(counts14); + } + ); + + // Test E: single-DB mode — the same client passed twice de-dups so the pass runs once. + heteroPostgresTest( + "single-DB: the same client passed twice de-dups so the delete pass runs exactly once", + async ({ prisma14 }) => { + const scope = await seedScope(prisma14); + await seedRunOpsSubgraph(prisma14, scope); + + const before = await subgraphCountsForScope(prisma14, scope); + + // Wrap the real client with a $extends query hook that counts deleteMany calls per model. NOT + // a mock — the query still runs against the container. If de-dup failed, the loop would run + // twice against this same client and taskRun.deleteMany would fire twice. + let taskRunDeleteManyCalls = 0; + const counting = prisma14.$extends({ + query: { + taskRun: { + async deleteMany({ args, query }) { + taskRunDeleteManyCalls++; + return query(args); + }, + }, + }, + }) as unknown as typeof prisma14; + + const result = await new RunOpsCascadeCleanupService({ + runOpsWriters: [counting, counting], + controlPlaneWriter: counting, + }).cleanupEnvironment(scope.environmentId); + + // De-dup ran the pass exactly once: one taskRun.deleteMany, count not double-summed. + expect(taskRunDeleteManyCalls).toBe(1); + expect(result.taskRun).toBe(before.taskRun); + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, scope)); + } + ); + + // Test F: the two-writer split — an env whose rows straddle both DBs (cuid runs on the LEGACY DB, + // ksuid runs on the NEW DB) is fully cleaned by one call; a single-writer service leaks orphans. + heteroPostgresTest( + "two-writer fan-out cleans a split env on both DBs; single-writer leaves orphans", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + + // One logical env that exists on both DBs (control-plane prereqs seeded on each), with the + // SAME env id, modelling the reference-equal control-plane row. We force a shared id + // by creating the legacy scope first, then mirroring its env id onto the new DB. + const legacy = await seedScope(prisma14); + const newOrg = await prisma17.organization.create({ + data: { id: legacy.organizationId, title: "mirror", slug: `mirror-${seedCounter++}` }, + }); + const newProject = await prisma17.project.create({ + data: { + id: legacy.projectId, + name: "mirror", + slug: `mirror-${seedCounter++}`, + externalRef: `mirror_${seedCounter++}`, + organizationId: newOrg.id, + }, + }); + const newEnv = await prisma17.runtimeEnvironment.create({ + data: { + id: legacy.environmentId, + type: "PRODUCTION", + slug: `mirror-${seedCounter++}`, + projectId: newProject.id, + organizationId: newOrg.id, + apiKey: `tr_${seedCounter++}`, + pkApiKey: `pk_${seedCounter++}`, + shortcode: `sc_${seedCounter++}`, + }, + }); + const newWorker = await prisma17.backgroundWorker.create({ + data: { + friendlyId: `w_${seedCounter++}`, + contentHash: "h", + projectId: newProject.id, + runtimeEnvironmentId: newEnv.id, + version: `2024.2.${seedCounter++}`, + metadata: {}, + engine: "V2", + }, + }); + const newTask = await prisma17.backgroundWorkerTask.create({ + data: { + friendlyId: `t_${seedCounter++}`, + slug: `s-${seedCounter++}`, + filePath: "index.ts", + exportName: "myTask", + workerId: newWorker.id, + runtimeEnvironmentId: newEnv.id, + projectId: newProject.id, + }, + }); + const newQueue = await prisma17.taskQueue.create({ + data: { + friendlyId: `q_${seedCounter++}`, + name: `task/s-${seedCounter++}`, + runtimeEnvironmentId: newEnv.id, + projectId: newProject.id, + }, + }); + + const newScope = { + projectId: newProject.id, + environmentId: newEnv.id, + organizationId: newOrg.id, + backgroundWorkerId: newWorker.id, + workerTaskId: newTask.id, + queueId: newQueue.id, + }; + + // Pre-cutover (LEGACY DB) and post-cutover (NEW DB) run-ops rows for the SAME env. + await seedRunOpsSubgraph(prisma14, legacy); + await seedRunOpsSubgraph(prisma17, newScope); + + // Two-writer fan-out: one call cleans BOTH DBs. + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(legacy.environmentId); + + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, legacy)); + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, newScope)); + + // The orphan-leak guard: re-seed and run a mis-built SINGLE-writer service; it must leave the + // OTHER DB's rows behind. + await seedRunOpsSubgraph(prisma14, legacy); + await seedRunOpsSubgraph(prisma17, newScope); + + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(legacy.environmentId); + + // NEW DB cleaned, LEGACY DB orphans remain — proving a one-handle delete leaks. + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, newScope)); + const leaked = await subgraphCountsForScope(prisma14, legacy); + expect(leaked.taskRun).toBeGreaterThan(0); + } + ); +}); diff --git a/internal-packages/database/prisma/migrations/20260626130000_drop_task_run_to_task_run_tag_join/migration.sql b/internal-packages/database/prisma/migrations/20260626130000_drop_task_run_to_task_run_tag_join/migration.sql new file mode 100644 index 00000000000..a6f9b5fd461 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260626130000_drop_task_run_to_task_run_tag_join/migration.sql @@ -0,0 +1,9 @@ +/* + Warnings: + + - You are about to drop the `_TaskRunToTaskRunTag` table. If the table is not empty, all the data it contains will be lost. + +*/ + +-- DropTable +DROP TABLE IF EXISTS "_TaskRunToTaskRunTag" CASCADE; diff --git a/internal-packages/database/prisma/migrations/20260626140000_drop_bulk_action_item_group_fk/migration.sql b/internal-packages/database/prisma/migrations/20260626140000_drop_bulk_action_item_group_fk/migration.sql new file mode 100644 index 00000000000..3f2dec3db3a --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260626140000_drop_bulk_action_item_group_fk/migration.sql @@ -0,0 +1,2 @@ +-- DropForeignKey +ALTER TABLE "BulkActionItem" DROP CONSTRAINT "BulkActionItem_groupId_fkey"; diff --git a/internal-packages/database/prisma/migrations/20260626150000_drop_batch_task_run_environment_fk/migration.sql b/internal-packages/database/prisma/migrations/20260626150000_drop_batch_task_run_environment_fk/migration.sql new file mode 100644 index 00000000000..9704bb66145 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260626150000_drop_batch_task_run_environment_fk/migration.sql @@ -0,0 +1,2 @@ +-- DropForeignKey +ALTER TABLE "BatchTaskRun" DROP CONSTRAINT "BatchTaskRun_runtimeEnvironmentId_fkey"; diff --git a/internal-packages/database/prisma/migrations/20260629120000_drop_run_ops_control_plane_foreign_keys/migration.sql b/internal-packages/database/prisma/migrations/20260629120000_drop_run_ops_control_plane_foreign_keys/migration.sql new file mode 100644 index 00000000000..571716f7bef --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260629120000_drop_run_ops_control_plane_foreign_keys/migration.sql @@ -0,0 +1,26 @@ +-- Run-ops DB split: drop run-ops tables' cross-database FKs to control-plane tables (they +-- can't be enforced on the run-ops DB, which has no control-plane tables). Mirrors the +-- earlier TaskRun/BatchTaskRun drops; IF EXISTS so it's idempotent across both databases. + +-- Waitpoint +ALTER TABLE "Waitpoint" DROP CONSTRAINT IF EXISTS "Waitpoint_projectId_fkey"; +ALTER TABLE "Waitpoint" DROP CONSTRAINT IF EXISTS "Waitpoint_environmentId_fkey"; + +-- TaskRunWaitpoint +ALTER TABLE "TaskRunWaitpoint" DROP CONSTRAINT IF EXISTS "TaskRunWaitpoint_projectId_fkey"; + +-- TaskRunCheckpoint +ALTER TABLE "TaskRunCheckpoint" DROP CONSTRAINT IF EXISTS "TaskRunCheckpoint_projectId_fkey"; +ALTER TABLE "TaskRunCheckpoint" DROP CONSTRAINT IF EXISTS "TaskRunCheckpoint_runtimeEnvironmentId_fkey"; + +-- TaskRunAttempt +ALTER TABLE "TaskRunAttempt" DROP CONSTRAINT IF EXISTS "TaskRunAttempt_backgroundWorkerId_fkey"; +ALTER TABLE "TaskRunAttempt" DROP CONSTRAINT IF EXISTS "TaskRunAttempt_backgroundWorkerTaskId_fkey"; +ALTER TABLE "TaskRunAttempt" DROP CONSTRAINT IF EXISTS "TaskRunAttempt_runtimeEnvironmentId_fkey"; + +-- TaskRunTag +ALTER TABLE "TaskRunTag" DROP CONSTRAINT IF EXISTS "TaskRunTag_projectId_fkey"; + +-- WaitpointTag +ALTER TABLE "WaitpointTag" DROP CONSTRAINT IF EXISTS "WaitpointTag_projectId_fkey"; +ALTER TABLE "WaitpointTag" DROP CONSTRAINT IF EXISTS "WaitpointTag_environmentId_fkey"; diff --git a/internal-packages/database/prisma/migrations/20260630120000_drop_project_alert_run_subgraph_foreign_keys/migration.sql b/internal-packages/database/prisma/migrations/20260630120000_drop_project_alert_run_subgraph_foreign_keys/migration.sql new file mode 100644 index 00000000000..ae5adafa826 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260630120000_drop_project_alert_run_subgraph_foreign_keys/migration.sql @@ -0,0 +1,7 @@ +-- Run-ops DB split: drop ProjectAlert's cross-DB FKs into the run subgraph (taskRunId -> TaskRun, +-- taskRunAttemptId -> TaskRunAttempt). A ksuid run lives only on the dedicated run-ops DB so the FK +-- can't be enforced; scalar columns are kept and the run is resolved via runStore.findRun. Mirrors +-- ca1a4e18e; IF EXISTS so it's idempotent across both databases. + +ALTER TABLE "ProjectAlert" DROP CONSTRAINT IF EXISTS "ProjectAlert_taskRunId_fkey"; +ALTER TABLE "ProjectAlert" DROP CONSTRAINT IF EXISTS "ProjectAlert_taskRunAttemptId_fkey"; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 7cf441eb9f8..6220fc265a4 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -369,7 +369,6 @@ model RuntimeEnvironment { backgroundWorkerTasks BackgroundWorkerTask[] taskRuns TaskRun[] taskQueues TaskQueue[] - batchTaskRuns BatchTaskRun[] environmentVariableValues EnvironmentVariableValue[] checkpoints Checkpoint[] workerDeployments WorkerDeployment[] @@ -939,7 +938,6 @@ model TaskRun { updatedAt DateTime @updatedAt attempts TaskRunAttempt[] @relation("attempts") - tags TaskRunTag[] /// Denormized column that holds the raw tags runTags String[] @@ -1004,8 +1002,6 @@ model TaskRun { CheckpointRestoreEvent CheckpointRestoreEvent[] executionSnapshots TaskRunExecutionSnapshot[] - alerts ProjectAlert[] - scheduleInstanceId String? scheduleId String? @@ -1574,8 +1570,6 @@ model TaskRunTag { friendlyId String @unique - runs TaskRun[] - project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade) projectId String @@ -1674,7 +1668,6 @@ model TaskRunAttempt { checkpoints Checkpoint[] batchTaskRunItems BatchTaskRunItem[] CheckpointRestoreEvent CheckpointRestoreEvent[] - alerts ProjectAlert[] childRuns TaskRun[] @relation("TaskParentRunAttempt") @@unique([taskRunId, number]) @@ -1872,7 +1865,6 @@ model BatchTaskRun { friendlyId String @unique idempotencyKey String? idempotencyKeyExpiresAt DateTime? - runtimeEnvironment RuntimeEnvironment @relation(fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade) status BatchTaskRunStatus @default(PENDING) runtimeEnvironmentId String /// This only includes new runs, not idempotent runs. @@ -2373,10 +2365,10 @@ model ProjectAlert { type ProjectAlertType - taskRunAttempt TaskRunAttempt? @relation(fields: [taskRunAttemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) + // Run-ops split: these reference run-subgraph rows that live on the dedicated run-ops DB for ksuid + // runs, so the cross-DB FK can't hold. Scalar-only; the run is resolved via runStore.findRun. taskRunAttemptId String? - taskRun TaskRun? @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) taskRunId String? workerDeployment WorkerDeployment? @relation(fields: [workerDeploymentId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -2488,7 +2480,6 @@ model BulkActionGroup { environmentId String? type BulkActionType - items BulkActionItem[] /// When the group is created it's pending. After we've processed all the items it's completed. This does not mean the associated runs are completed. status BulkActionStatus @default(PENDING) @@ -2556,7 +2547,6 @@ model BulkActionItem { /// @deprecated not used in new BulkActions friendlyId String? - group BulkActionGroup @relation(fields: [groupId], references: [id], onDelete: Cascade, onUpdate: Cascade) groupId String type BulkActionType diff --git a/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts b/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts new file mode 100644 index 00000000000..1e3cfc3234d --- /dev/null +++ b/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts @@ -0,0 +1,127 @@ +// ProjectAlert.taskRunId/taskRunAttemptId FKs point INTO the run subgraph. A ksuid run lives ONLY +// on the dedicated run-ops DB (prisma17), so `projectAlert.create({ taskRunId: })` on +// control-plane (prisma14) violates the FK and the alert is silently dropped. After the FK drop + +// @relation removal the create succeeds; the read path resolves the run via runStore.findRun. +// Asserts the create SUCCEEDS — RED (FK violation) before the fix, GREEN after. + +import { heteroRunOpsPostgresTest } from "@internal/testcontainers"; +import type { PrismaClient } from "@trigger.dev/database"; +import type { RunOpsPrismaClient } from "@internal/run-ops-database"; +import { describe, expect } from "vitest"; + +// 27-char internal id → ksuid → NEW (lives only on the dedicated run-ops DB). +const KSUID_27 = "k".repeat(27); + +async function seedControlPlaneAlertPrereqs(prisma: PrismaClient, suffix: string) { + const organization = await prisma.organization.create({ + data: { title: `Org ${suffix}`, slug: `org-${suffix}` }, + }); + const project = await prisma.project.create({ + data: { + name: `Project ${suffix}`, + slug: `project-${suffix}`, + externalRef: `proj_${suffix}`, + organizationId: organization.id, + }, + }); + const environment = await prisma.runtimeEnvironment.create({ + data: { + type: "PRODUCTION", + slug: "prod", + projectId: project.id, + organizationId: organization.id, + apiKey: `tr_prod_${suffix}`, + pkApiKey: `pk_prod_${suffix}`, + shortcode: `short_${suffix}`, + }, + }); + const channel = await prisma.projectAlertChannel.create({ + data: { + friendlyId: `alert_channel_${suffix}`, + type: "EMAIL", + name: "Email", + properties: { type: "EMAIL", email: "alerts@example.com" }, + alertTypes: ["TASK_RUN"], + projectId: project.id, + }, + }); + return { organization, project, environment, channel }; +} + +describe("ProjectAlert control-plane → run-subgraph FK reconciliation", () => { + heteroRunOpsPostgresTest( + "creating a TASK_RUN alert with a ksuid taskRunId (run only on the run-ops DB) succeeds on control-plane", + async ({ prisma14, prisma17 }) => { + const suffix = "alert-ksuid"; + const { project, environment, channel } = await seedControlPlaneAlertPrereqs(prisma14, suffix); + + // The run exists ONLY on the dedicated run-ops DB (prisma17), never on control-plane. + await (prisma17 as RunOpsPrismaClient).taskRun.create({ + data: { + id: KSUID_27, + friendlyId: `run_${suffix}`, + engine: "V2", + status: "COMPLETED_WITH_ERRORS", + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceId: `trace_${suffix}`, + spanId: `span_${suffix}`, + queue: "task/my-task", + runtimeEnvironmentId: environment.id, + projectId: project.id, + organizationId: project.organizationId, + environmentType: "PRODUCTION", + }, + }); + + // Control-plane has no TaskRun row for KSUID_27. With the FK present this throws P2003; + // after the FK is dropped + the @relation removed it succeeds. + const alert = await prisma14.projectAlert.create({ + data: { + friendlyId: `alert_${suffix}`, + channelId: channel.id, + projectId: project.id, + environmentId: environment.id, + status: "PENDING", + type: "TASK_RUN", + taskRunId: KSUID_27, + }, + }); + + expect(alert.taskRunId).toBe(KSUID_27); + + // The scalar round-trips and can be re-read off the control-plane row (the read path resolves + // the actual run via runStore.findRun against the run-ops DB). + const reread = await prisma14.projectAlert.findUniqueOrThrow({ where: { id: alert.id } }); + expect(reread.taskRunId).toBe(KSUID_27); + }, + 120_000 + ); + + heteroRunOpsPostgresTest( + "creating a TASK_RUN_ATTEMPT alert with a ksuid taskRunAttemptId (attempt only on the run-ops DB) succeeds on control-plane", + async ({ prisma14 }) => { + const suffix = "alert-ksuid-attempt"; + const { project, environment, channel } = await seedControlPlaneAlertPrereqs(prisma14, suffix); + + // A ksuid attempt id with no matching control-plane TaskRunAttempt row. With the FK present + // this throws P2003; after the FK is dropped it succeeds. + const attemptId = "a".repeat(27); + const alert = await prisma14.projectAlert.create({ + data: { + friendlyId: `alert_${suffix}`, + channelId: channel.id, + projectId: project.id, + environmentId: environment.id, + status: "PENDING", + type: "TASK_RUN_ATTEMPT", + taskRunAttemptId: attemptId, + }, + }); + + expect(alert.taskRunAttemptId).toBe(attemptId); + }, + 120_000 + ); +}); From 002b5cb26ec7ea63bc0da14e87153d341e801118 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Thu, 2 Jul 2026 14:37:42 +0100 Subject: [PATCH 2/6] chore(run-ops split): strip test-plan enumeration scaffolding from pr11 activation tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove Test A–F enumeration prefixes from the run-ops cascade-cleanup comments and the RED/GREEN TDD framing from the ProjectAlert control-plane FK reconciliation test header. Comment-only; no product logic, test behavior, or migration SQL changed. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../runOpsCascadeCleanup.server.test.ts | 12 ++++++------ .../src/PostgresRunStore.controlPlaneAlertFk.test.ts | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts b/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts index 2065ae06b0a..3c7cf3c5c84 100644 --- a/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts +++ b/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts @@ -384,7 +384,7 @@ describe("RunOpsCascadeCleanupService", () => { } ); - // Test A: env cleanup over both writers empties the subgraph on BOTH DBs + BulkActionItem on the + // Env cleanup over both writers empties the subgraph on BOTH DBs + BulkActionItem on the // control-plane DB; a sibling scope survives. heteroPostgresTest( "cleanupEnvironment empties the subgraph across both writers, isolating a sibling env", @@ -422,7 +422,7 @@ describe("RunOpsCascadeCleanupService", () => { } ); - // Test B: project cleanup over both writers. + // Project cleanup over both writers. heteroPostgresTest( "cleanupProject empties the subgraph across both writers, isolating a sibling project", async ({ prisma14, prisma17 }) => { @@ -459,7 +459,7 @@ describe("RunOpsCascadeCleanupService", () => { } ); - // Test C: idempotency — a second cleanup returns all-zero counts and does not throw on either DB. + // Idempotency — a second cleanup returns all-zero counts and does not throw on either DB. heteroPostgresTest( "cleanupEnvironment is idempotent on a re-run across both FK configs", async ({ prisma14, prisma17 }) => { @@ -496,7 +496,7 @@ describe("RunOpsCascadeCleanupService", () => { } ); - // Test D: FK-retained vs FK-dropped fixtures reach an identical run-subgraph end-state. + // FK-retained vs FK-dropped fixtures reach an identical run-subgraph end-state. heteroPostgresTest( "FK-retained and FK-dropped fixtures reach an identical end-state after cleanup", async ({ prisma14, prisma17 }) => { @@ -522,7 +522,7 @@ describe("RunOpsCascadeCleanupService", () => { } ); - // Test E: single-DB mode — the same client passed twice de-dups so the pass runs once. + // Single-DB mode — the same client passed twice de-dups so the pass runs once. heteroPostgresTest( "single-DB: the same client passed twice de-dups so the delete pass runs exactly once", async ({ prisma14 }) => { @@ -558,7 +558,7 @@ describe("RunOpsCascadeCleanupService", () => { } ); - // Test F: the two-writer split — an env whose rows straddle both DBs (cuid runs on the LEGACY DB, + // The two-writer split — an env whose rows straddle both DBs (cuid runs on the LEGACY DB, // ksuid runs on the NEW DB) is fully cleaned by one call; a single-writer service leaks orphans. heteroPostgresTest( "two-writer fan-out cleans a split env on both DBs; single-writer leaves orphans", diff --git a/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts b/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts index 1e3cfc3234d..9fa7acd5453 100644 --- a/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts +++ b/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts @@ -2,7 +2,7 @@ // on the dedicated run-ops DB (prisma17), so `projectAlert.create({ taskRunId: })` on // control-plane (prisma14) violates the FK and the alert is silently dropped. After the FK drop + // @relation removal the create succeeds; the read path resolves the run via runStore.findRun. -// Asserts the create SUCCEEDS — RED (FK violation) before the fix, GREEN after. +// Asserts the create succeeds: it fails with an FK violation before the fix and succeeds after. import { heteroRunOpsPostgresTest } from "@internal/testcontainers"; import type { PrismaClient } from "@trigger.dev/database"; From 0529c5bf961b306a7975674b739dd1f5f8cf0a8e Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Thu, 2 Jul 2026 15:32:19 +0100 Subject: [PATCH 3/6] style(run-ops): apply oxfmt Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/PostgresRunStore.controlPlaneAlertFk.test.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts b/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts index 9fa7acd5453..7dfffb4ee6c 100644 --- a/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts +++ b/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts @@ -53,7 +53,10 @@ describe("ProjectAlert control-plane → run-subgraph FK reconciliation", () => "creating a TASK_RUN alert with a ksuid taskRunId (run only on the run-ops DB) succeeds on control-plane", async ({ prisma14, prisma17 }) => { const suffix = "alert-ksuid"; - const { project, environment, channel } = await seedControlPlaneAlertPrereqs(prisma14, suffix); + const { project, environment, channel } = await seedControlPlaneAlertPrereqs( + prisma14, + suffix + ); // The run exists ONLY on the dedicated run-ops DB (prisma17), never on control-plane. await (prisma17 as RunOpsPrismaClient).taskRun.create({ @@ -103,7 +106,10 @@ describe("ProjectAlert control-plane → run-subgraph FK reconciliation", () => "creating a TASK_RUN_ATTEMPT alert with a ksuid taskRunAttemptId (attempt only on the run-ops DB) succeeds on control-plane", async ({ prisma14 }) => { const suffix = "alert-ksuid-attempt"; - const { project, environment, channel } = await seedControlPlaneAlertPrereqs(prisma14, suffix); + const { project, environment, channel } = await seedControlPlaneAlertPrereqs( + prisma14, + suffix + ); // A ksuid attempt id with no matching control-plane TaskRunAttempt row. With the FK present // this throws P2003; after the FK is dropped it succeeds. From f5a5fc3f0f44fb4b4e86a0978e800fb661bc7f3d Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Thu, 2 Jul 2026 20:08:30 +0100 Subject: [PATCH 4/6] fix(run-ops split): scalar-only bulk-action queries after group relation drop The activation schema removed the BulkActionGroup.items and BulkActionItem.group Prisma relations, but PerformBulkActionService still queried them via include, which would fail once the client regenerates. Switch to scalar groupId lookups and read the action kind from BulkActionItem.type. Also drop a stale test comment. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../v3/services/bulk/performBulkAction.server.ts | 16 +++++++++------- .../test/dropTaskRunToTaskRunTagJoin.test.ts | 4 +--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts b/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts index a982ccad2e0..b6ea92aace6 100644 --- a/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts +++ b/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts @@ -10,7 +10,6 @@ export class PerformBulkActionService extends BaseService { const item = await this._prisma.bulkActionItem.findFirst({ where: { id: bulkActionItemId }, include: { - group: true, sourceRun: true, destinationRun: true, }, @@ -24,7 +23,7 @@ export class PerformBulkActionService extends BaseService { return; } - switch (item.group.type) { + switch (item.type) { case "REPLAY": { const service = new ReplayTaskRunService(this._prisma); const result = await service.call(item.sourceRun, { triggerSource: "dashboard" }); @@ -57,7 +56,7 @@ export class PerformBulkActionService extends BaseService { break; } default: { - assertNever(item.group.type); + assertNever(item.type); } } @@ -94,17 +93,20 @@ export class PerformBulkActionService extends BaseService { public async call(bulkActionGroupId: string) { const actionGroup = await this._prisma.bulkActionGroup.findFirst({ - include: { - items: true, - }, where: { id: bulkActionGroupId }, + select: { id: true }, }); if (!actionGroup) { return; } - for (const item of actionGroup.items) { + const items = await this._prisma.bulkActionItem.findMany({ + where: { groupId: bulkActionGroupId }, + select: { id: true }, + }); + + for (const item of items) { await this.enqueueBulkActionItem(item.id, bulkActionGroupId); } } diff --git a/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts b/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts index e6f270aa861..150ce44bf6f 100644 --- a/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts +++ b/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts @@ -1,6 +1,4 @@ -// Single-version proof for dropping the dead `_TaskRunToTaskRunTag` implicit join. The -// cross-version follow-up (via heteroPostgresTest) is tracked separately; those helpers -// do not exist in this tree yet. +// Single-version proof for dropping the dead `_TaskRunToTaskRunTag` implicit join. import { describe, expect } from "vitest"; import { postgresTest } from "@internal/testcontainers"; From 3e6cd39a1e6be459b2d6a94974424263478abd33 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Fri, 3 Jul 2026 08:50:41 +0100 Subject: [PATCH 5/6] chore: add server-changes for pr10 Co-Authored-By: Claude Opus 4.8 (1M context) --- .server-changes/run-ops-split-activation.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .server-changes/run-ops-split-activation.md diff --git a/.server-changes/run-ops-split-activation.md b/.server-changes/run-ops-split-activation.md new file mode 100644 index 00000000000..2697b14f1a6 --- /dev/null +++ b/.server-changes/run-ops-split-activation.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Enable the dedicated run-ops database split: run records and their related rows are served from a separate database, with cross-database references resolved in application code instead of database foreign keys. From 0a09a64788fd335d6bb4fc31eda4eadb07871e58 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Fri, 3 Jul 2026 11:34:10 +0100 Subject: [PATCH 6/6] chore(run-ops): fix lint/format for main lint rules Co-Authored-By: Claude Opus 4.8 (1M context) --- .../v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts b/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts index 3c7cf3c5c84..d9250496955 100644 --- a/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts +++ b/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts @@ -20,12 +20,7 @@ const SUBGRAPH_TABLES = [ "batchTaskRun", ] as const; -// Control-plane-resident model: exists ONLY in @trigger.dev/database, NOT in the run-ops subset. -// Seeded + counted only against the control-plane writer. -const CONTROL_PLANE_TABLES = ["bulkActionItem"] as const; - type SubgraphTable = (typeof SUBGRAPH_TABLES)[number]; -type ControlPlaneTable = (typeof CONTROL_PLANE_TABLES)[number]; let seedCounter = 0;