diff --git a/.server-changes/run-ops-split-activation.md b/.server-changes/run-ops-split-activation.md new file mode 100644 index 00000000000..2697b14f1a6 --- /dev/null +++ b/.server-changes/run-ops-split-activation.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Enable the dedicated run-ops database split: run records and their related rows are served from a separate database, with cross-database references resolved in application code instead of database foreign keys. diff --git a/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts b/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts index a982ccad2e0..b6ea92aace6 100644 --- a/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts +++ b/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts @@ -10,7 +10,6 @@ export class PerformBulkActionService extends BaseService { const item = await this._prisma.bulkActionItem.findFirst({ where: { id: bulkActionItemId }, include: { - group: true, sourceRun: true, destinationRun: true, }, @@ -24,7 +23,7 @@ export class PerformBulkActionService extends BaseService { return; } - switch (item.group.type) { + switch (item.type) { case "REPLAY": { const service = new ReplayTaskRunService(this._prisma); const result = await service.call(item.sourceRun, { triggerSource: "dashboard" }); @@ -57,7 +56,7 @@ export class PerformBulkActionService extends BaseService { break; } default: { - assertNever(item.group.type); + assertNever(item.type); } } @@ -94,17 +93,20 @@ export class PerformBulkActionService extends BaseService { public async call(bulkActionGroupId: string) { const actionGroup = await this._prisma.bulkActionGroup.findFirst({ - include: { - items: true, - }, where: { id: bulkActionGroupId }, + select: { id: true }, }); if (!actionGroup) { return; } - for (const item of actionGroup.items) { + const items = await this._prisma.bulkActionItem.findMany({ + where: { groupId: bulkActionGroupId }, + select: { id: true }, + }); + + for (const item of items) { await this.enqueueBulkActionItem(item.id, bulkActionGroupId); } } diff --git a/apps/webapp/test/batchTaskRunEnvironmentFkDrop.test.ts b/apps/webapp/test/batchTaskRunEnvironmentFkDrop.test.ts new file mode 100644 index 00000000000..ba8c29a1c38 --- /dev/null +++ b/apps/webapp/test/batchTaskRunEnvironmentFkDrop.test.ts @@ -0,0 +1,169 @@ +// Proof for dropping the canonical BatchTaskRun -> RuntimeEnvironment FK +// (constraint "BatchTaskRun_runtimeEnvironmentId_fkey", onDelete: Cascade) while keeping the +// runtimeEnvironmentId scalar and its compound @@unique/@@index. BatchTaskRun is run-ops and +// RuntimeEnvironment is control-plane, so the two may live on different servers; create-time +// integrity is preserved app-side via the ControlPlaneResolver's assertEnvExists. Env-delete +// orphan cleanup is handled separately — here the batch row is tolerated. + +import { heteroPostgresTest, postgresTest } from "@internal/testcontainers"; +import type { PrismaClient } from "@trigger.dev/database"; +import { describe, expect, vi } from "vitest"; +import { ControlPlaneCache } from "~/v3/runOpsMigration/controlPlaneCache.server"; +import { + ControlPlaneReferenceError, + ControlPlaneResolver, +} from "~/v3/runOpsMigration/controlPlaneResolver.server"; + +// Cross-DB testcontainer spin-up + queries can exceed the 5s default on the first test. +vi.setConfig({ testTimeout: 60_000 }); + +let seedCounter = 0; + +async function seedEnvironment(prisma: PrismaClient) { + const n = seedCounter++; + const organization = await prisma.organization.create({ + data: { title: `Org ${n}`, slug: `org-${n}` }, + }); + const project = await prisma.project.create({ + data: { + name: `Project ${n}`, + slug: `project-${n}`, + externalRef: `proj_${n}`, + organizationId: organization.id, + }, + }); + const environment = await prisma.runtimeEnvironment.create({ + data: { + type: "PRODUCTION", + slug: `env-${n}`, + projectId: project.id, + organizationId: organization.id, + apiKey: `tr_prod_${n}`, + pkApiKey: `pk_prod_${n}`, + shortcode: `short_${n}`, + }, + }); + return { organization, project, environment }; +} + +let batchCounter = 0; + +async function createBatch(prisma: PrismaClient, runtimeEnvironmentId: string) { + const n = batchCounter++; + return prisma.batchTaskRun.create({ + data: { + friendlyId: `batch_${n}`, + runtimeEnvironmentId, + runCount: 1, + runIds: [], + batchVersion: "runengine:v2", + }, + }); +} + +// Asserts the post-migration state of BatchTaskRun on a given client: the FK is gone, but the +// scalar and both compound constraints are retained. Shared by the single-version and the +// cross-version suites. +async function assertSchemaState(prisma: PrismaClient) { + const foreignKeys = await prisma.$queryRaw<{ constraint_name: string }[]>` + SELECT constraint_name + FROM information_schema.table_constraints + WHERE table_name = 'BatchTaskRun' + AND constraint_type = 'FOREIGN KEY' + `; + expect(foreignKeys.map((c) => c.constraint_name)).not.toContain( + "BatchTaskRun_runtimeEnvironmentId_fkey" + ); + + const columns = await prisma.$queryRaw<{ column_name: string }[]>` + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'BatchTaskRun' + AND column_name = 'runtimeEnvironmentId' + `; + expect(columns).toHaveLength(1); + + // The @@unique([runtimeEnvironmentId, idempotencyKey]) and + // @@index([runtimeEnvironmentId, id(sort: Desc)]) both survive the FK drop. + const indexes = await prisma.$queryRaw<{ indexdef: string }[]>` + SELECT indexdef FROM pg_indexes WHERE tablename = 'BatchTaskRun' + `; + const defs = indexes.map((i) => i.indexdef); + const hasUnique = defs.some( + (d) => /UNIQUE/i.test(d) && d.includes("runtimeEnvironmentId") && d.includes("idempotencyKey") + ); + const hasIndex = defs.some( + (d) => !/UNIQUE/i.test(d) && d.includes("runtimeEnvironmentId") && /\bid\b/.test(d) + ); + expect(hasUnique).toBe(true); + expect(hasIndex).toBe(true); +} + +// Inserts an env + batch, deletes the env, and asserts the batch survives (cascade gone). +async function assertOrphanTolerated(prisma: PrismaClient) { + const { environment } = await seedEnvironment(prisma); + const batch = await createBatch(prisma, environment.id); + + await prisma.runtimeEnvironment.delete({ where: { id: environment.id } }); + + const survivor = await prisma.batchTaskRun.findFirst({ where: { id: batch.id } }); + expect(survivor).not.toBeNull(); + expect(survivor?.runtimeEnvironmentId).toBe(environment.id); +} + +describe("drop BatchTaskRun -> RuntimeEnvironment FK", () => { + postgresTest("FK constraint absent; scalar + unique + index retained", async ({ prisma }) => { + await assertSchemaState(prisma); + }); + + postgresTest( + "deleting the env leaves the BatchTaskRun row alive (no cascade; orphan cleanup handled separately)", + async ({ prisma }) => { + await assertOrphanTolerated(prisma); + } + ); + + postgresTest( + "app-side env validation: assertEnvExists rejects an invalid env and a valid-env create succeeds by scalar", + async ({ prisma }) => { + const { environment } = await seedEnvironment(prisma); + + const resolver = new ControlPlaneResolver({ + controlPlanePrimary: prisma, + controlPlaneReplica: prisma, + cache: new ControlPlaneCache(), + splitEnabled: () => true, + }); + + // The exact guard call the create services place before batchTaskRun.create. + await expect(resolver.assertEnvExists("env_does_not_exist")).rejects.toBeInstanceOf( + ControlPlaneReferenceError + ); + + await expect(resolver.assertEnvExists(environment.id)).resolves.toBeUndefined(); + + // Once the guard passes, the batch is linked by the runtimeEnvironmentId scalar (no FK). + const batch = await createBatch(prisma, environment.id); + expect(batch.runtimeEnvironmentId).toBe(environment.id); + } + ); +}); + +// Cross-version gate: the migration applies and the post-state is identical across major versions. +describe("drop BatchTaskRun -> RuntimeEnvironment FK — cross-version (legacy + new Postgres)", () => { + heteroPostgresTest( + "migration applies and FK is absent on both the legacy and new databases", + async ({ prisma14, prisma17 }) => { + await assertSchemaState(prisma14); + await assertSchemaState(prisma17); + } + ); + + heteroPostgresTest( + "env delete leaves the batch orphaned on both the legacy and new databases", + async ({ prisma14, prisma17 }) => { + await assertOrphanTolerated(prisma14); + await assertOrphanTolerated(prisma17); + } + ); +}); diff --git a/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts b/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts new file mode 100644 index 00000000000..150ce44bf6f --- /dev/null +++ b/apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts @@ -0,0 +1,64 @@ +// Single-version proof for dropping the dead `_TaskRunToTaskRunTag` implicit join. + +import { describe, expect } from "vitest"; +import { postgresTest } from "@internal/testcontainers"; + +describe("drop _TaskRunToTaskRunTag implicit join", () => { + postgresTest("runTags scalar round-trips and the join table is gone", async ({ prisma }) => { + const organization = await prisma.organization.create({ + data: { + title: "test", + slug: "test", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + const taskRun = await prisma.taskRun.create({ + data: { + friendlyId: "run_1234", + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + payloadType: "application/json", + traceId: "1234", + spanId: "1234", + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + runTags: ["alpha", "beta"], + }, + }); + + const readBack = await prisma.taskRun.findFirstOrThrow({ + where: { id: taskRun.id }, + }); + expect(readBack.runTags).toEqual(["alpha", "beta"]); + + const result = await prisma.$queryRaw<{ t: string | null }[]>` + SELECT to_regclass('public._TaskRunToTaskRunTag')::text as t + `; + expect(result[0].t).toBeNull(); + }); +}); diff --git a/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts b/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts new file mode 100644 index 00000000000..d9250496955 --- /dev/null +++ b/apps/webapp/test/v3/runOpsMigration/runOpsCascadeCleanup.server.test.ts @@ -0,0 +1,660 @@ +import { heteroPostgresTest, heteroRunOpsPostgresTest } from "@internal/testcontainers"; +import type { PrismaClient } from "@trigger.dev/database"; +import type { RunOpsPrismaClient } from "@internal/run-ops-database"; +import { describe, expect, vi } from "vitest"; +import { RunOpsCascadeCleanupService } from "~/v3/runOpsMigration/runOpsCascadeCleanup.server"; + +// Cross-DB testcontainer spin-up + the multi-table seed can exceed the 5s default. +vi.setConfig({ testTimeout: 120_000 }); + +// Run-subgraph tables that live in BOTH the control-plane schema AND the dedicated run-ops SUBSET +// schema, so they are deleted on every run-ops writer. +const SUBGRAPH_TABLES = [ + "taskRun", + "taskRunAttempt", + "waitpoint", + "taskRunWaitpoint", + "taskRunCheckpoint", + "checkpoint", + "checkpointRestoreEvent", + "batchTaskRun", +] as const; + +type SubgraphTable = (typeof SUBGRAPH_TABLES)[number]; + +let seedCounter = 0; + +/** + * The cross-seam (run-ops -> control-plane) Cascade FKs that the cloud DB physically drops. Applied + * to the FK-dropped fixture to model cloud; the other side keeps them to model self-host. Only the + * run-subgraph constraints exist on the dedicated run-ops schema; BulkActionItem's are control-plane + * only and are dropped separately on a full-schema client. + */ +const SUBGRAPH_CROSS_SEAM_FKS: Array<{ table: string; constraint: string }> = [ + { table: "TaskRun", constraint: "TaskRun_runtimeEnvironmentId_fkey" }, + { table: "TaskRun", constraint: "TaskRun_projectId_fkey" }, + { table: "TaskRunAttempt", constraint: "TaskRunAttempt_runtimeEnvironmentId_fkey" }, + { table: "Waitpoint", constraint: "Waitpoint_environmentId_fkey" }, + { table: "Waitpoint", constraint: "Waitpoint_projectId_fkey" }, + { table: "TaskRunWaitpoint", constraint: "TaskRunWaitpoint_projectId_fkey" }, + { table: "TaskRunCheckpoint", constraint: "TaskRunCheckpoint_runtimeEnvironmentId_fkey" }, + { table: "TaskRunCheckpoint", constraint: "TaskRunCheckpoint_projectId_fkey" }, + { table: "Checkpoint", constraint: "Checkpoint_runtimeEnvironmentId_fkey" }, + { table: "Checkpoint", constraint: "Checkpoint_projectId_fkey" }, + { + table: "CheckpointRestoreEvent", + constraint: "CheckpointRestoreEvent_runtimeEnvironmentId_fkey", + }, + { table: "CheckpointRestoreEvent", constraint: "CheckpointRestoreEvent_projectId_fkey" }, + { table: "BatchTaskRun", constraint: "BatchTaskRun_runtimeEnvironmentId_fkey" }, +]; + +const BULK_ACTION_CROSS_SEAM_FKS: Array<{ table: string; constraint: string }> = [ + { table: "BulkActionItem", constraint: "BulkActionItem_sourceRunId_fkey" }, + { table: "BulkActionItem", constraint: "BulkActionItem_destinationRunId_fkey" }, +]; + +async function dropCrossSeamFks( + prisma: { $executeRawUnsafe: (q: string) => Promise }, + fks: Array<{ table: string; constraint: string }> +) { + for (const { table, constraint } of fks) { + await prisma.$executeRawUnsafe( + `ALTER TABLE "${table}" DROP CONSTRAINT IF EXISTS "${constraint}"` + ); + } +} + +type Scope = { projectId: string; environmentId: string; organizationId: string }; +type FullScope = Scope & { workerTaskId: string; queueId: string; backgroundWorkerId: string }; + +// Minimal structural client covering the control-plane prerequisites + run-subgraph models the +// seed/count helpers touch. Both PrismaClient and RunOpsPrismaClient are assignable. +type SeedClient = { + organization: any; + project: any; + runtimeEnvironment: any; + backgroundWorker: any; + backgroundWorkerTask: any; + taskQueue: any; + taskRun: any; + taskRunAttempt: any; + waitpoint: any; + taskRunWaitpoint: any; + taskRunCheckpoint: any; + checkpoint: any; + checkpointRestoreEvent: any; + batchTaskRun: any; +}; + +// Synthetic scope for the dedicated run-ops subset client, whose schema scalarizes every +// control-plane FK so no org/project/env rows are required. +function makeSyntheticScope(): FullScope { + const n = seedCounter++; + return { + projectId: `proj_synthetic_${n}`, + environmentId: `env_synthetic_${n}`, + organizationId: `org_synthetic_${n}`, + workerTaskId: `task_synthetic_${n}`, + queueId: `queue_synthetic_${n}`, + backgroundWorkerId: `worker_synthetic_${n}`, + }; +} + +/** Create the control-plane prerequisites (org, project, env, worker, task, queue). */ +async function seedScope(prisma: SeedClient): Promise { + const n = seedCounter++; + const org = await prisma.organization.create({ + data: { title: `Org ${n}`, slug: `org-${n}` }, + }); + const project = await prisma.project.create({ + data: { + name: `Project ${n}`, + slug: `project-${n}`, + externalRef: `proj_${n}`, + organizationId: org.id, + }, + }); + const environment = await prisma.runtimeEnvironment.create({ + data: { + type: "PRODUCTION", + slug: `env-${n}`, + projectId: project.id, + organizationId: org.id, + apiKey: `tr_prod_${n}`, + pkApiKey: `pk_prod_${n}`, + shortcode: `short_${n}`, + }, + }); + const worker = await prisma.backgroundWorker.create({ + data: { + friendlyId: `worker_${n}`, + contentHash: `hash_${n}`, + projectId: project.id, + runtimeEnvironmentId: environment.id, + version: `2024.1.${n}`, + metadata: {}, + engine: "V2", + }, + }); + const task = await prisma.backgroundWorkerTask.create({ + data: { + friendlyId: `task_${n}`, + slug: `my-task-${n}`, + filePath: "index.ts", + exportName: "myTask", + workerId: worker.id, + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + const queue = await prisma.taskQueue.create({ + data: { + friendlyId: `queue_${n}`, + name: `task/my-task-${n}`, + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + return { + projectId: project.id, + environmentId: environment.id, + organizationId: org.id, + workerTaskId: task.id, + queueId: queue.id, + backgroundWorkerId: worker.id, + }; +} + +/** + * Seed one full run-ops subgraph for a scope: a TaskRun tree (root + child), an attempt, a + * Waitpoint with a blocking edge (TaskRunWaitpoint), a TaskRunCheckpoint, a Checkpoint + a + * CheckpointRestoreEvent, and a BatchTaskRun with a member run. Returns the source + destination + * runs so a caller with a control-plane client can attach a BulkActionItem. + */ +async function seedRunOpsSubgraph( + prisma: SeedClient, + scope: Scope & { backgroundWorkerId: string; workerTaskId: string; queueId: string } +): Promise<{ sourceRunId: string; destinationRunId: string }> { + const n = seedCounter++; + const { projectId, environmentId } = scope; + + const baseRun = (suffix: string) => ({ + friendlyId: `run_${n}_${suffix}`, + taskIdentifier: `my-task-${n}`, + payload: "{}", + payloadType: "application/json", + traceId: `trace_${n}_${suffix}`, + spanId: `span_${n}_${suffix}`, + queue: `task/my-task-${n}`, + runtimeEnvironmentId: environmentId, + projectId, + }); + + const rootRun = await prisma.taskRun.create({ data: baseRun("root") }); + const childRun = await prisma.taskRun.create({ + data: { ...baseRun("child"), parentTaskRunId: rootRun.id, rootTaskRunId: rootRun.id }, + }); + + const attempt = await prisma.taskRunAttempt.create({ + data: { + friendlyId: `attempt_${n}`, + taskRunId: rootRun.id, + backgroundWorkerId: scope.backgroundWorkerId, + backgroundWorkerTaskId: scope.workerTaskId, + runtimeEnvironmentId: environmentId, + queueId: scope.queueId, + }, + }); + + const waitpoint = await prisma.waitpoint.create({ + data: { + friendlyId: `wp_${n}`, + type: "MANUAL", + idempotencyKey: `wp_idem_${n}`, + userProvidedIdempotencyKey: false, + environmentId, + projectId, + }, + }); + await prisma.taskRunWaitpoint.create({ + data: { taskRunId: rootRun.id, waitpointId: waitpoint.id, projectId }, + }); + + await prisma.taskRunCheckpoint.create({ + data: { + friendlyId: `trcp_${n}`, + type: "DOCKER", + location: "loc", + runtimeEnvironmentId: environmentId, + projectId, + }, + }); + + const checkpoint = await prisma.checkpoint.create({ + data: { + friendlyId: `cp_${n}`, + type: "DOCKER", + location: "loc", + imageRef: "ref", + runId: rootRun.id, + attemptId: attempt.id, + runtimeEnvironmentId: environmentId, + projectId, + }, + }); + await prisma.checkpointRestoreEvent.create({ + data: { + type: "CHECKPOINT", + checkpointId: checkpoint.id, + runId: rootRun.id, + attemptId: attempt.id, + runtimeEnvironmentId: environmentId, + projectId, + }, + }); + + const batch = await prisma.batchTaskRun.create({ + data: { friendlyId: `batch_${n}`, runtimeEnvironmentId: environmentId }, + }); + await prisma.taskRun.update({ where: { id: childRun.id }, data: { batchId: batch.id } }); + + const sourceRun = await prisma.taskRun.create({ data: baseRun("src") }); + const destRun = await prisma.taskRun.create({ data: baseRun("dst") }); + return { sourceRunId: sourceRun.id, destinationRunId: destRun.id }; +} + +/** Attach a BulkActionItem (control-plane-resident) over the given source/destination runs. */ +async function seedBulkActionItem( + prisma: PrismaClient, + runs: { sourceRunId: string; destinationRunId: string } +): Promise { + await prisma.bulkActionItem.create({ + data: { + groupId: `grp_${seedCounter++}`, + type: "REPLAY", + sourceRunId: runs.sourceRunId, + destinationRunId: runs.destinationRunId, + }, + }); +} + +async function subgraphCountsForScope( + prisma: SeedClient, + scope: { projectId: string } +): Promise> { + const { projectId } = scope; + return { + taskRun: await prisma.taskRun.count({ where: { projectId } }), + taskRunAttempt: await prisma.taskRunAttempt.count({ where: { taskRun: { projectId } } }), + waitpoint: await prisma.waitpoint.count({ where: { projectId } }), + taskRunWaitpoint: await prisma.taskRunWaitpoint.count({ where: { projectId } }), + taskRunCheckpoint: await prisma.taskRunCheckpoint.count({ where: { projectId } }), + checkpoint: await prisma.checkpoint.count({ where: { projectId } }), + checkpointRestoreEvent: await prisma.checkpointRestoreEvent.count({ where: { projectId } }), + batchTaskRun: await prisma.batchTaskRun.count({ + where: { runs: { some: { projectId } } }, + }), + }; +} + +async function bulkActionItemCountForScope( + prisma: PrismaClient, + scope: { projectId: string } +): Promise { + return prisma.bulkActionItem.count({ where: { sourceRun: { projectId: scope.projectId } } }); +} + +function expectSubgraphAllZero(counts: Record) { + for (const table of SUBGRAPH_TABLES) { + expect(counts[table], `${table} should be empty`).toBe(0); + } +} + +function expectSubgraphAllNonZero(counts: Record) { + for (const table of SUBGRAPH_TABLES) { + expect(counts[table], `${table} should be seeded`).toBeGreaterThan(0); + } +} + +describe("RunOpsCascadeCleanupService", () => { + // REGRESSION: the NEW run-ops writer is a real RunOpsPrismaClient over the dedicated + // SUBSET schema — it has NO `bulkActionItem` delegate. Before the fix, the per-writer pass called + // `writer.bulkActionItem.deleteMany` on this client => TypeError (Cannot read properties of + // undefined). After the fix, BulkActionItem is cleaned ONLY on the control-plane writer (prisma14), + // and the run-subgraph is deleted on the NEW DB without throwing. + heteroRunOpsPostgresTest( + "cleanupProject does not throw on the dedicated RunOpsPrismaClient and clears the new DB subgraph", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma14, BULK_ACTION_CROSS_SEAM_FKS); + + // The dedicated run-ops subset schema scalarizes every control-plane FK, so the NEW DB needs + // NO org/project/env prereqs — seed the subgraph directly with synthetic scope ids. + const newScope = makeSyntheticScope(); + await seedRunOpsSubgraph(prisma17 as unknown as SeedClient, newScope); + + // BulkActionItem (control-plane-resident) lives only on the control-plane DB. + const cp = await seedScope(prisma14); + const cpRuns = await seedRunOpsSubgraph(prisma14, cp); + await seedBulkActionItem(prisma14, cpRuns); + + // prisma17 is a real RunOpsPrismaClient (subset, no bulkActionItem delegate); prisma14 is the + // control-plane writer. Before the fix this threw a TypeError on writer.bulkActionItem. + const result = await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14 as unknown as RunOpsPrismaClient], + controlPlaneWriter: prisma14, + }).cleanupProject(newScope.projectId); + + expectSubgraphAllZero( + await subgraphCountsForScope(prisma17 as unknown as SeedClient, newScope) + ); + // BulkActionItem cleanup ran against the control-plane writer and deleted the control-plane + // project's item; the subset client was never asked for the missing delegate. + expect(result.bulkActionItem).toBeGreaterThanOrEqual(0); + } + ); + + // REGRESSION (env variant): same guarantee for cleanupEnvironment. + heteroRunOpsPostgresTest( + "cleanupEnvironment does not throw on the dedicated RunOpsPrismaClient and clears the new DB subgraph", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma14, BULK_ACTION_CROSS_SEAM_FKS); + + const newScope = makeSyntheticScope(); + await seedRunOpsSubgraph(prisma17 as unknown as SeedClient, newScope); + + const cp = await seedScope(prisma14); + const cpRuns = await seedRunOpsSubgraph(prisma14, cp); + await seedBulkActionItem(prisma14, cpRuns); + + const result = await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14 as unknown as RunOpsPrismaClient], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(newScope.environmentId); + + expectSubgraphAllZero( + await subgraphCountsForScope(prisma17 as unknown as SeedClient, newScope) + ); + expect(result.bulkActionItem).toBeGreaterThanOrEqual(0); + } + ); + + // Env cleanup over both writers empties the subgraph on BOTH DBs + BulkActionItem on the + // control-plane DB; a sibling scope survives. + heteroPostgresTest( + "cleanupEnvironment empties the subgraph across both writers, isolating a sibling env", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + await dropCrossSeamFks(prisma17, BULK_ACTION_CROSS_SEAM_FKS); + + const target14 = await seedScope(prisma14); + const target17 = await seedScope(prisma17); + const targetRuns14 = await seedRunOpsSubgraph(prisma14, target14); + await seedRunOpsSubgraph(prisma17, target17); + await seedBulkActionItem(prisma14, targetRuns14); + + const sibling14 = await seedScope(prisma14); + const sibling17 = await seedScope(prisma17); + const siblingRuns14 = await seedRunOpsSubgraph(prisma14, sibling14); + await seedRunOpsSubgraph(prisma17, sibling17); + await seedBulkActionItem(prisma14, siblingRuns14); + + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(target14.environmentId); + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(target17.environmentId); + + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, target14)); + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, target17)); + expect(await bulkActionItemCountForScope(prisma14, target14)).toBe(0); + expectSubgraphAllNonZero(await subgraphCountsForScope(prisma14, sibling14)); + expectSubgraphAllNonZero(await subgraphCountsForScope(prisma17, sibling17)); + expect(await bulkActionItemCountForScope(prisma14, sibling14)).toBeGreaterThan(0); + } + ); + + // Project cleanup over both writers. + heteroPostgresTest( + "cleanupProject empties the subgraph across both writers, isolating a sibling project", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + await dropCrossSeamFks(prisma17, BULK_ACTION_CROSS_SEAM_FKS); + + const target14 = await seedScope(prisma14); + const target17 = await seedScope(prisma17); + const targetRuns14 = await seedRunOpsSubgraph(prisma14, target14); + await seedRunOpsSubgraph(prisma17, target17); + await seedBulkActionItem(prisma14, targetRuns14); + + const sibling14 = await seedScope(prisma14); + const sibling17 = await seedScope(prisma17); + const siblingRuns14 = await seedRunOpsSubgraph(prisma14, sibling14); + await seedRunOpsSubgraph(prisma17, sibling17); + await seedBulkActionItem(prisma14, siblingRuns14); + + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupProject(target14.projectId); + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupProject(target17.projectId); + + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, target14)); + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, target17)); + expect(await bulkActionItemCountForScope(prisma14, target14)).toBe(0); + expectSubgraphAllNonZero(await subgraphCountsForScope(prisma14, sibling14)); + expectSubgraphAllNonZero(await subgraphCountsForScope(prisma17, sibling17)); + expect(await bulkActionItemCountForScope(prisma14, sibling14)).toBeGreaterThan(0); + } + ); + + // Idempotency — a second cleanup returns all-zero counts and does not throw on either DB. + heteroPostgresTest( + "cleanupEnvironment is idempotent on a re-run across both FK configs", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + await dropCrossSeamFks(prisma17, BULK_ACTION_CROSS_SEAM_FKS); + + const t14 = await seedScope(prisma14); + const t17 = await seedScope(prisma17); + const runs14 = await seedRunOpsSubgraph(prisma14, t14); + await seedRunOpsSubgraph(prisma17, t17); + await seedBulkActionItem(prisma14, runs14); + + const svc14 = new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma14], + controlPlaneWriter: prisma14, + }); + const svc17 = new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17], + controlPlaneWriter: prisma14, + }); + await svc14.cleanupEnvironment(t14.environmentId); + await svc17.cleanupEnvironment(t17.environmentId); + + const second14 = await svc14.cleanupEnvironment(t14.environmentId); + const second17 = await svc17.cleanupEnvironment(t17.environmentId); + + for (const result of [second14, second17]) { + for (const count of Object.values(result)) { + expect(count).toBe(0); + } + } + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, t14)); + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, t17)); + } + ); + + // FK-retained vs FK-dropped fixtures reach an identical run-subgraph end-state. + heteroPostgresTest( + "FK-retained and FK-dropped fixtures reach an identical end-state after cleanup", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + + const s14 = await seedScope(prisma14); + const s17 = await seedScope(prisma17); + await seedRunOpsSubgraph(prisma14, s14); + await seedRunOpsSubgraph(prisma17, s17); + + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma14], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(s14.environmentId); + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17], + controlPlaneWriter: prisma17, + }).cleanupEnvironment(s17.environmentId); + + const counts14 = await subgraphCountsForScope(prisma14, s14); + const counts17 = await subgraphCountsForScope(prisma17, s17); + expect(counts17).toEqual(counts14); + } + ); + + // Single-DB mode — the same client passed twice de-dups so the pass runs once. + heteroPostgresTest( + "single-DB: the same client passed twice de-dups so the delete pass runs exactly once", + async ({ prisma14 }) => { + const scope = await seedScope(prisma14); + await seedRunOpsSubgraph(prisma14, scope); + + const before = await subgraphCountsForScope(prisma14, scope); + + // Wrap the real client with a $extends query hook that counts deleteMany calls per model. NOT + // a mock — the query still runs against the container. If de-dup failed, the loop would run + // twice against this same client and taskRun.deleteMany would fire twice. + let taskRunDeleteManyCalls = 0; + const counting = prisma14.$extends({ + query: { + taskRun: { + async deleteMany({ args, query }) { + taskRunDeleteManyCalls++; + return query(args); + }, + }, + }, + }) as unknown as typeof prisma14; + + const result = await new RunOpsCascadeCleanupService({ + runOpsWriters: [counting, counting], + controlPlaneWriter: counting, + }).cleanupEnvironment(scope.environmentId); + + // De-dup ran the pass exactly once: one taskRun.deleteMany, count not double-summed. + expect(taskRunDeleteManyCalls).toBe(1); + expect(result.taskRun).toBe(before.taskRun); + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, scope)); + } + ); + + // The two-writer split — an env whose rows straddle both DBs (cuid runs on the LEGACY DB, + // ksuid runs on the NEW DB) is fully cleaned by one call; a single-writer service leaks orphans. + heteroPostgresTest( + "two-writer fan-out cleans a split env on both DBs; single-writer leaves orphans", + async ({ prisma14, prisma17 }) => { + await dropCrossSeamFks(prisma17, SUBGRAPH_CROSS_SEAM_FKS); + + // One logical env that exists on both DBs (control-plane prereqs seeded on each), with the + // SAME env id, modelling the reference-equal control-plane row. We force a shared id + // by creating the legacy scope first, then mirroring its env id onto the new DB. + const legacy = await seedScope(prisma14); + const newOrg = await prisma17.organization.create({ + data: { id: legacy.organizationId, title: "mirror", slug: `mirror-${seedCounter++}` }, + }); + const newProject = await prisma17.project.create({ + data: { + id: legacy.projectId, + name: "mirror", + slug: `mirror-${seedCounter++}`, + externalRef: `mirror_${seedCounter++}`, + organizationId: newOrg.id, + }, + }); + const newEnv = await prisma17.runtimeEnvironment.create({ + data: { + id: legacy.environmentId, + type: "PRODUCTION", + slug: `mirror-${seedCounter++}`, + projectId: newProject.id, + organizationId: newOrg.id, + apiKey: `tr_${seedCounter++}`, + pkApiKey: `pk_${seedCounter++}`, + shortcode: `sc_${seedCounter++}`, + }, + }); + const newWorker = await prisma17.backgroundWorker.create({ + data: { + friendlyId: `w_${seedCounter++}`, + contentHash: "h", + projectId: newProject.id, + runtimeEnvironmentId: newEnv.id, + version: `2024.2.${seedCounter++}`, + metadata: {}, + engine: "V2", + }, + }); + const newTask = await prisma17.backgroundWorkerTask.create({ + data: { + friendlyId: `t_${seedCounter++}`, + slug: `s-${seedCounter++}`, + filePath: "index.ts", + exportName: "myTask", + workerId: newWorker.id, + runtimeEnvironmentId: newEnv.id, + projectId: newProject.id, + }, + }); + const newQueue = await prisma17.taskQueue.create({ + data: { + friendlyId: `q_${seedCounter++}`, + name: `task/s-${seedCounter++}`, + runtimeEnvironmentId: newEnv.id, + projectId: newProject.id, + }, + }); + + const newScope = { + projectId: newProject.id, + environmentId: newEnv.id, + organizationId: newOrg.id, + backgroundWorkerId: newWorker.id, + workerTaskId: newTask.id, + queueId: newQueue.id, + }; + + // Pre-cutover (LEGACY DB) and post-cutover (NEW DB) run-ops rows for the SAME env. + await seedRunOpsSubgraph(prisma14, legacy); + await seedRunOpsSubgraph(prisma17, newScope); + + // Two-writer fan-out: one call cleans BOTH DBs. + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17, prisma14], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(legacy.environmentId); + + expectSubgraphAllZero(await subgraphCountsForScope(prisma14, legacy)); + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, newScope)); + + // The orphan-leak guard: re-seed and run a mis-built SINGLE-writer service; it must leave the + // OTHER DB's rows behind. + await seedRunOpsSubgraph(prisma14, legacy); + await seedRunOpsSubgraph(prisma17, newScope); + + await new RunOpsCascadeCleanupService({ + runOpsWriters: [prisma17], + controlPlaneWriter: prisma14, + }).cleanupEnvironment(legacy.environmentId); + + // NEW DB cleaned, LEGACY DB orphans remain — proving a one-handle delete leaks. + expectSubgraphAllZero(await subgraphCountsForScope(prisma17, newScope)); + const leaked = await subgraphCountsForScope(prisma14, legacy); + expect(leaked.taskRun).toBeGreaterThan(0); + } + ); +}); diff --git a/internal-packages/database/prisma/migrations/20260626130000_drop_task_run_to_task_run_tag_join/migration.sql b/internal-packages/database/prisma/migrations/20260626130000_drop_task_run_to_task_run_tag_join/migration.sql new file mode 100644 index 00000000000..a6f9b5fd461 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260626130000_drop_task_run_to_task_run_tag_join/migration.sql @@ -0,0 +1,9 @@ +/* + Warnings: + + - You are about to drop the `_TaskRunToTaskRunTag` table. If the table is not empty, all the data it contains will be lost. + +*/ + +-- DropTable +DROP TABLE IF EXISTS "_TaskRunToTaskRunTag" CASCADE; diff --git a/internal-packages/database/prisma/migrations/20260626140000_drop_bulk_action_item_group_fk/migration.sql b/internal-packages/database/prisma/migrations/20260626140000_drop_bulk_action_item_group_fk/migration.sql new file mode 100644 index 00000000000..3f2dec3db3a --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260626140000_drop_bulk_action_item_group_fk/migration.sql @@ -0,0 +1,2 @@ +-- DropForeignKey +ALTER TABLE "BulkActionItem" DROP CONSTRAINT "BulkActionItem_groupId_fkey"; diff --git a/internal-packages/database/prisma/migrations/20260626150000_drop_batch_task_run_environment_fk/migration.sql b/internal-packages/database/prisma/migrations/20260626150000_drop_batch_task_run_environment_fk/migration.sql new file mode 100644 index 00000000000..9704bb66145 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260626150000_drop_batch_task_run_environment_fk/migration.sql @@ -0,0 +1,2 @@ +-- DropForeignKey +ALTER TABLE "BatchTaskRun" DROP CONSTRAINT "BatchTaskRun_runtimeEnvironmentId_fkey"; diff --git a/internal-packages/database/prisma/migrations/20260629120000_drop_run_ops_control_plane_foreign_keys/migration.sql b/internal-packages/database/prisma/migrations/20260629120000_drop_run_ops_control_plane_foreign_keys/migration.sql new file mode 100644 index 00000000000..571716f7bef --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260629120000_drop_run_ops_control_plane_foreign_keys/migration.sql @@ -0,0 +1,26 @@ +-- Run-ops DB split: drop run-ops tables' cross-database FKs to control-plane tables (they +-- can't be enforced on the run-ops DB, which has no control-plane tables). Mirrors the +-- earlier TaskRun/BatchTaskRun drops; IF EXISTS so it's idempotent across both databases. + +-- Waitpoint +ALTER TABLE "Waitpoint" DROP CONSTRAINT IF EXISTS "Waitpoint_projectId_fkey"; +ALTER TABLE "Waitpoint" DROP CONSTRAINT IF EXISTS "Waitpoint_environmentId_fkey"; + +-- TaskRunWaitpoint +ALTER TABLE "TaskRunWaitpoint" DROP CONSTRAINT IF EXISTS "TaskRunWaitpoint_projectId_fkey"; + +-- TaskRunCheckpoint +ALTER TABLE "TaskRunCheckpoint" DROP CONSTRAINT IF EXISTS "TaskRunCheckpoint_projectId_fkey"; +ALTER TABLE "TaskRunCheckpoint" DROP CONSTRAINT IF EXISTS "TaskRunCheckpoint_runtimeEnvironmentId_fkey"; + +-- TaskRunAttempt +ALTER TABLE "TaskRunAttempt" DROP CONSTRAINT IF EXISTS "TaskRunAttempt_backgroundWorkerId_fkey"; +ALTER TABLE "TaskRunAttempt" DROP CONSTRAINT IF EXISTS "TaskRunAttempt_backgroundWorkerTaskId_fkey"; +ALTER TABLE "TaskRunAttempt" DROP CONSTRAINT IF EXISTS "TaskRunAttempt_runtimeEnvironmentId_fkey"; + +-- TaskRunTag +ALTER TABLE "TaskRunTag" DROP CONSTRAINT IF EXISTS "TaskRunTag_projectId_fkey"; + +-- WaitpointTag +ALTER TABLE "WaitpointTag" DROP CONSTRAINT IF EXISTS "WaitpointTag_projectId_fkey"; +ALTER TABLE "WaitpointTag" DROP CONSTRAINT IF EXISTS "WaitpointTag_environmentId_fkey"; diff --git a/internal-packages/database/prisma/migrations/20260630120000_drop_project_alert_run_subgraph_foreign_keys/migration.sql b/internal-packages/database/prisma/migrations/20260630120000_drop_project_alert_run_subgraph_foreign_keys/migration.sql new file mode 100644 index 00000000000..ae5adafa826 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260630120000_drop_project_alert_run_subgraph_foreign_keys/migration.sql @@ -0,0 +1,7 @@ +-- Run-ops DB split: drop ProjectAlert's cross-DB FKs into the run subgraph (taskRunId -> TaskRun, +-- taskRunAttemptId -> TaskRunAttempt). A ksuid run lives only on the dedicated run-ops DB so the FK +-- can't be enforced; scalar columns are kept and the run is resolved via runStore.findRun. Mirrors +-- ca1a4e18e; IF EXISTS so it's idempotent across both databases. + +ALTER TABLE "ProjectAlert" DROP CONSTRAINT IF EXISTS "ProjectAlert_taskRunId_fkey"; +ALTER TABLE "ProjectAlert" DROP CONSTRAINT IF EXISTS "ProjectAlert_taskRunAttemptId_fkey"; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 7cf441eb9f8..6220fc265a4 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -369,7 +369,6 @@ model RuntimeEnvironment { backgroundWorkerTasks BackgroundWorkerTask[] taskRuns TaskRun[] taskQueues TaskQueue[] - batchTaskRuns BatchTaskRun[] environmentVariableValues EnvironmentVariableValue[] checkpoints Checkpoint[] workerDeployments WorkerDeployment[] @@ -939,7 +938,6 @@ model TaskRun { updatedAt DateTime @updatedAt attempts TaskRunAttempt[] @relation("attempts") - tags TaskRunTag[] /// Denormized column that holds the raw tags runTags String[] @@ -1004,8 +1002,6 @@ model TaskRun { CheckpointRestoreEvent CheckpointRestoreEvent[] executionSnapshots TaskRunExecutionSnapshot[] - alerts ProjectAlert[] - scheduleInstanceId String? scheduleId String? @@ -1574,8 +1570,6 @@ model TaskRunTag { friendlyId String @unique - runs TaskRun[] - project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade) projectId String @@ -1674,7 +1668,6 @@ model TaskRunAttempt { checkpoints Checkpoint[] batchTaskRunItems BatchTaskRunItem[] CheckpointRestoreEvent CheckpointRestoreEvent[] - alerts ProjectAlert[] childRuns TaskRun[] @relation("TaskParentRunAttempt") @@unique([taskRunId, number]) @@ -1872,7 +1865,6 @@ model BatchTaskRun { friendlyId String @unique idempotencyKey String? idempotencyKeyExpiresAt DateTime? - runtimeEnvironment RuntimeEnvironment @relation(fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade) status BatchTaskRunStatus @default(PENDING) runtimeEnvironmentId String /// This only includes new runs, not idempotent runs. @@ -2373,10 +2365,10 @@ model ProjectAlert { type ProjectAlertType - taskRunAttempt TaskRunAttempt? @relation(fields: [taskRunAttemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) + // Run-ops split: these reference run-subgraph rows that live on the dedicated run-ops DB for ksuid + // runs, so the cross-DB FK can't hold. Scalar-only; the run is resolved via runStore.findRun. taskRunAttemptId String? - taskRun TaskRun? @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) taskRunId String? workerDeployment WorkerDeployment? @relation(fields: [workerDeploymentId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -2488,7 +2480,6 @@ model BulkActionGroup { environmentId String? type BulkActionType - items BulkActionItem[] /// When the group is created it's pending. After we've processed all the items it's completed. This does not mean the associated runs are completed. status BulkActionStatus @default(PENDING) @@ -2556,7 +2547,6 @@ model BulkActionItem { /// @deprecated not used in new BulkActions friendlyId String? - group BulkActionGroup @relation(fields: [groupId], references: [id], onDelete: Cascade, onUpdate: Cascade) groupId String type BulkActionType diff --git a/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts b/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts new file mode 100644 index 00000000000..7dfffb4ee6c --- /dev/null +++ b/internal-packages/run-store/src/PostgresRunStore.controlPlaneAlertFk.test.ts @@ -0,0 +1,133 @@ +// ProjectAlert.taskRunId/taskRunAttemptId FKs point INTO the run subgraph. A ksuid run lives ONLY +// on the dedicated run-ops DB (prisma17), so `projectAlert.create({ taskRunId: })` on +// control-plane (prisma14) violates the FK and the alert is silently dropped. After the FK drop + +// @relation removal the create succeeds; the read path resolves the run via runStore.findRun. +// Asserts the create succeeds: it fails with an FK violation before the fix and succeeds after. + +import { heteroRunOpsPostgresTest } from "@internal/testcontainers"; +import type { PrismaClient } from "@trigger.dev/database"; +import type { RunOpsPrismaClient } from "@internal/run-ops-database"; +import { describe, expect } from "vitest"; + +// 27-char internal id → ksuid → NEW (lives only on the dedicated run-ops DB). +const KSUID_27 = "k".repeat(27); + +async function seedControlPlaneAlertPrereqs(prisma: PrismaClient, suffix: string) { + const organization = await prisma.organization.create({ + data: { title: `Org ${suffix}`, slug: `org-${suffix}` }, + }); + const project = await prisma.project.create({ + data: { + name: `Project ${suffix}`, + slug: `project-${suffix}`, + externalRef: `proj_${suffix}`, + organizationId: organization.id, + }, + }); + const environment = await prisma.runtimeEnvironment.create({ + data: { + type: "PRODUCTION", + slug: "prod", + projectId: project.id, + organizationId: organization.id, + apiKey: `tr_prod_${suffix}`, + pkApiKey: `pk_prod_${suffix}`, + shortcode: `short_${suffix}`, + }, + }); + const channel = await prisma.projectAlertChannel.create({ + data: { + friendlyId: `alert_channel_${suffix}`, + type: "EMAIL", + name: "Email", + properties: { type: "EMAIL", email: "alerts@example.com" }, + alertTypes: ["TASK_RUN"], + projectId: project.id, + }, + }); + return { organization, project, environment, channel }; +} + +describe("ProjectAlert control-plane → run-subgraph FK reconciliation", () => { + heteroRunOpsPostgresTest( + "creating a TASK_RUN alert with a ksuid taskRunId (run only on the run-ops DB) succeeds on control-plane", + async ({ prisma14, prisma17 }) => { + const suffix = "alert-ksuid"; + const { project, environment, channel } = await seedControlPlaneAlertPrereqs( + prisma14, + suffix + ); + + // The run exists ONLY on the dedicated run-ops DB (prisma17), never on control-plane. + await (prisma17 as RunOpsPrismaClient).taskRun.create({ + data: { + id: KSUID_27, + friendlyId: `run_${suffix}`, + engine: "V2", + status: "COMPLETED_WITH_ERRORS", + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceId: `trace_${suffix}`, + spanId: `span_${suffix}`, + queue: "task/my-task", + runtimeEnvironmentId: environment.id, + projectId: project.id, + organizationId: project.organizationId, + environmentType: "PRODUCTION", + }, + }); + + // Control-plane has no TaskRun row for KSUID_27. With the FK present this throws P2003; + // after the FK is dropped + the @relation removed it succeeds. + const alert = await prisma14.projectAlert.create({ + data: { + friendlyId: `alert_${suffix}`, + channelId: channel.id, + projectId: project.id, + environmentId: environment.id, + status: "PENDING", + type: "TASK_RUN", + taskRunId: KSUID_27, + }, + }); + + expect(alert.taskRunId).toBe(KSUID_27); + + // The scalar round-trips and can be re-read off the control-plane row (the read path resolves + // the actual run via runStore.findRun against the run-ops DB). + const reread = await prisma14.projectAlert.findUniqueOrThrow({ where: { id: alert.id } }); + expect(reread.taskRunId).toBe(KSUID_27); + }, + 120_000 + ); + + heteroRunOpsPostgresTest( + "creating a TASK_RUN_ATTEMPT alert with a ksuid taskRunAttemptId (attempt only on the run-ops DB) succeeds on control-plane", + async ({ prisma14 }) => { + const suffix = "alert-ksuid-attempt"; + const { project, environment, channel } = await seedControlPlaneAlertPrereqs( + prisma14, + suffix + ); + + // A ksuid attempt id with no matching control-plane TaskRunAttempt row. With the FK present + // this throws P2003; after the FK is dropped it succeeds. + const attemptId = "a".repeat(27); + const alert = await prisma14.projectAlert.create({ + data: { + friendlyId: `alert_${suffix}`, + channelId: channel.id, + projectId: project.id, + environmentId: environment.id, + status: "PENDING", + type: "TASK_RUN_ATTEMPT", + taskRunAttemptId: attemptId, + }, + }); + + expect(alert.taskRunAttemptId).toBe(attemptId); + }, + 120_000 + ); +});