From a306598414466b35326aa52fceb9339590ee9e42 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 1 Jul 2026 16:20:00 +0100 Subject: [PATCH 1/8] feat(run-ops): dedicated run-ops database package + docker service + migration runner Co-Authored-By: Claude Opus 4.8 (1M context) --- .github/workflows/unit-tests-internal.yml | 1 + docker/Dockerfile.postgres17 | 5 + docker/docker-compose.yml | 31 + internal-packages/database/package.json | 1 + .../database/scripts/migrate-run-ops.mjs | 62 ++ internal-packages/run-ops-database/.env | 1 + internal-packages/run-ops-database/.gitignore | 6 + .../run-ops-database/package.json | 39 + .../20260629174132_init/migration.sql | 621 ++++++++++++ .../migration.sql | 5 + .../prisma/migrations/migration_lock.toml | 3 + .../prisma/schema.parity.test.ts | 85 ++ .../run-ops-database/prisma/schema.prisma | 928 ++++++++++++++++++ .../run-ops-database/src/index.ts | 4 + .../run-ops-database/tsconfig.json | 19 + .../run-ops-database/vitest.config.ts | 10 + internal-packages/testcontainers/package.json | 1 + .../src/heteroDedicated.test.ts | 26 + internal-packages/testcontainers/src/index.ts | 215 +++- internal-packages/testcontainers/src/utils.ts | 119 ++- package.json | 1 + pnpm-lock.yaml | 124 ++- 22 files changed, 2202 insertions(+), 105 deletions(-) create mode 100644 docker/Dockerfile.postgres17 create mode 100644 internal-packages/database/scripts/migrate-run-ops.mjs create mode 120000 internal-packages/run-ops-database/.env create mode 100644 internal-packages/run-ops-database/.gitignore create mode 100644 internal-packages/run-ops-database/package.json create mode 100644 internal-packages/run-ops-database/prisma/migrations/20260629174132_init/migration.sql create mode 100644 internal-packages/run-ops-database/prisma/migrations/20260630000000_task_run_waitpoint_null_batchindex_partial_unique/migration.sql create mode 100644 internal-packages/run-ops-database/prisma/migrations/migration_lock.toml create mode 100644 internal-packages/run-ops-database/prisma/schema.parity.test.ts create mode 100644 internal-packages/run-ops-database/prisma/schema.prisma create mode 100644 internal-packages/run-ops-database/src/index.ts create mode 100644 internal-packages/run-ops-database/tsconfig.json create mode 100644 internal-packages/run-ops-database/vitest.config.ts create mode 100644 internal-packages/testcontainers/src/heteroDedicated.test.ts diff --git a/.github/workflows/unit-tests-internal.yml b/.github/workflows/unit-tests-internal.yml index c5b357b22c5..f0b12335611 100644 --- a/.github/workflows/unit-tests-internal.yml +++ b/.github/workflows/unit-tests-internal.yml @@ -95,6 +95,7 @@ jobs: } echo "Pre-pulling Docker images with authenticated session..." pull postgres:14 + pull postgres:17 # for the heterogeneous PG14/17 test fixture gate (heteroPostgresTest) pull clickhouse/clickhouse-server:26.2.19.43-alpine@sha256:c6ad6a7eb2fb5999df3adfb8b69a0c7222c68fa9b8f6b04a088564ebbc959251 pull redis:7.2 pull testcontainers/ryuk:0.14.0 diff --git a/docker/Dockerfile.postgres17 b/docker/Dockerfile.postgres17 new file mode 100644 index 00000000000..3dbcb755f57 --- /dev/null +++ b/docker/Dockerfile.postgres17 @@ -0,0 +1,5 @@ +FROM postgres:17 + +RUN apt-get update \ + && apt-get install -y --no-install-recommends postgresql-17-partman \ + && rm -rf /var/lib/apt/lists/* diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 52b0639bb32..46574c2677c 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -11,6 +11,7 @@ name: triggerdotdev-docker volumes: database-data: database-data-alt: + database-runops-data: database-replica-data: redis-data: minio-data: @@ -51,6 +52,36 @@ services: - -c - max_connections=500 + # Opt-in NEW run-ops database, PG17 (the `database` above is PG14) — a separate cluster + # so the run-ops split's distinct-DB sentinel passes. Start with: + # COMPOSE_PROFILES=runops pnpm run docker + database-runops: + container_name: ${CONTAINER_PREFIX:-}database-runops + profiles: ["runops"] + build: + context: . + dockerfile: Dockerfile.postgres17 + restart: always + volumes: + - ${DB_RUNOPS_VOLUME:-database-runops-data}:/var/lib/postgresql/data/ + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + networks: + - app_network + ports: + - "${POSTGRES_RUNOPS_HOST_PORT:-5434}:5432" + command: + - -c + - listen_addresses=* + - -c + - wal_level=logical + - -c + - shared_preload_libraries=pg_partman_bgw + - -c + - max_connections=500 + # Opt-in streaming read replica with configurable apply lag — a dial-a-lag rig for # testing replica-race behavior (e.g. the realtime read-your-writes gate) locally. # Start with: COMPOSE_PROFILES=replica pnpm run docker diff --git a/internal-packages/database/package.json b/internal-packages/database/package.json index ec0bc950b8c..55ef7cf47de 100644 --- a/internal-packages/database/package.json +++ b/internal-packages/database/package.json @@ -18,6 +18,7 @@ "generate": "prisma generate", "db:migrate:dev:create": "prisma migrate dev --create-only", "db:migrate:deploy": "prisma migrate deploy", + "db:migrate:run-ops": "node scripts/migrate-run-ops.mjs", "db:push": "prisma db push", "db:studio": "prisma studio", "db:reset": "prisma migrate reset", diff --git a/internal-packages/database/scripts/migrate-run-ops.mjs b/internal-packages/database/scripts/migrate-run-ops.mjs new file mode 100644 index 00000000000..7ec3dd996ed --- /dev/null +++ b/internal-packages/database/scripts/migrate-run-ops.mjs @@ -0,0 +1,62 @@ +// Apply Prisma migrations to the RUN-OPS database — the second physical DB in the run-ops +// split. The standard `db:migrate` only targets DATABASE_URL (the control-plane DB), so the +// run-ops DB must be migrated explicitly or its schema drifts (e.g. cross-DB FKs that were +// dropped on the control-plane DB linger on the run-ops DB and break inserts). +// +// The run-ops connection comes from TASK_RUN_DATABASE_URL / TASK_RUN_DATABASE_DIRECT_URL +// (set directly in deploy environments; read from the local .env otherwise). We then run +// `prisma migrate deploy` with DATABASE_URL/DIRECT_URL pointed at it. +import { spawnSync } from "node:child_process"; +import { readFileSync } from "node:fs"; +import { dirname, resolve } from "node:path"; +import { fileURLToPath } from "node:url"; + +const dbPackageRoot = resolve(dirname(fileURLToPath(import.meta.url)), ".."); + +function readFromEnvFiles(key) { + for (const file of [resolve(dbPackageRoot, ".env"), resolve(dbPackageRoot, "../../.env")]) { + let contents; + try { + contents = readFileSync(file, "utf8"); + } catch { + continue; + } + for (const line of contents.split("\n")) { + const match = line.match(/^\s*([A-Z0-9_]+)\s*=\s*(.*?)\s*$/); + if (!match || match[1] !== key) continue; + let value = match[2]; + if ( + (value.startsWith('"') && value.endsWith('"')) || + (value.startsWith("'") && value.endsWith("'")) + ) { + value = value.slice(1, -1); + } + if (value) return value; + } + } + return undefined; +} + +const resolveVar = (key) => process.env[key] || readFromEnvFiles(key); +const redact = (url) => url.replace(/:\/\/[^@]*@/, "://***@"); + +const databaseUrl = resolveVar("TASK_RUN_DATABASE_URL"); +const directUrl = resolveVar("TASK_RUN_DATABASE_DIRECT_URL") || databaseUrl; + +if (!databaseUrl) { + console.error( + "db:migrate:run-ops: TASK_RUN_DATABASE_URL is not set (checked env and .env). " + + "It is the run-ops database in the split — nothing to migrate without it." + ); + process.exit(1); +} + +console.log(`Applying Prisma migrations to the run-ops database (${redact(databaseUrl)})`); + +const result = spawnSync("prisma", ["migrate", "deploy"], { + cwd: dbPackageRoot, + stdio: "inherit", + env: { ...process.env, DATABASE_URL: databaseUrl, DIRECT_URL: directUrl }, +}); + +process.exit(result.status ?? 1); diff --git a/internal-packages/run-ops-database/.env b/internal-packages/run-ops-database/.env new file mode 120000 index 00000000000..c7360fb82d2 --- /dev/null +++ b/internal-packages/run-ops-database/.env @@ -0,0 +1 @@ +../../.env \ No newline at end of file diff --git a/internal-packages/run-ops-database/.gitignore b/internal-packages/run-ops-database/.gitignore new file mode 100644 index 00000000000..703c6f44f60 --- /dev/null +++ b/internal-packages/run-ops-database/.gitignore @@ -0,0 +1,6 @@ +node_modules +dist +generated/run-ops + +# Ensure the .env symlink is not removed by accident +!.env diff --git a/internal-packages/run-ops-database/package.json b/internal-packages/run-ops-database/package.json new file mode 100644 index 00000000000..6c26398c9c7 --- /dev/null +++ b/internal-packages/run-ops-database/package.json @@ -0,0 +1,39 @@ +{ + "name": "@internal/run-ops-database", + "private": true, + "version": "0.0.1", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "@triggerdotdev/source": "./src/index.ts", + "types": "./dist/index.d.ts", + "import": "./dist/index.js", + "default": "./dist/index.js" + }, + "./prisma/schema.prisma": "./prisma/schema.prisma", + "./prisma/*": "./prisma/*", + "./package.json": "./package.json" + }, + "dependencies": { + "@prisma/client": "6.14.0", + "prisma": "6.14.0" + }, + "devDependencies": { + "rimraf": "6.0.1", + "vitest": "4.1.7" + }, + "scripts": { + "clean": "rimraf dist", + "generate": "prisma generate", + "db:migrate:dev:create": "prisma migrate dev --create-only", + "db:migrate:deploy": "prisma migrate deploy", + "db:push": "prisma db push", + "test": "vitest run", + "typecheck": "tsc --noEmit", + "build": "pnpm run clean && tsc --noEmit false --outDir dist --declaration", + "dev": "tsc --noEmit false --outDir dist --declaration --watch", + "db:studio": "prisma studio", + "db:reset": "prisma migrate reset" + } +} diff --git a/internal-packages/run-ops-database/prisma/migrations/20260629174132_init/migration.sql b/internal-packages/run-ops-database/prisma/migrations/20260629174132_init/migration.sql new file mode 100644 index 00000000000..6a18631b6eb --- /dev/null +++ b/internal-packages/run-ops-database/prisma/migrations/20260629174132_init/migration.sql @@ -0,0 +1,621 @@ +-- CreateEnum +CREATE TYPE "public"."RuntimeEnvironmentType" AS ENUM ('PRODUCTION', 'STAGING', 'DEVELOPMENT', 'PREVIEW'); + +-- CreateEnum +CREATE TYPE "public"."TaskRunStatus" AS ENUM ('DELAYED', 'PENDING', 'PENDING_VERSION', 'WAITING_FOR_DEPLOY', 'DEQUEUED', 'EXECUTING', 'WAITING_TO_RESUME', 'RETRYING_AFTER_FAILURE', 'PAUSED', 'CANCELED', 'INTERRUPTED', 'COMPLETED_SUCCESSFULLY', 'COMPLETED_WITH_ERRORS', 'SYSTEM_FAILURE', 'CRASHED', 'EXPIRED', 'TIMED_OUT'); + +-- CreateEnum +CREATE TYPE "public"."RunEngineVersion" AS ENUM ('V1', 'V2'); + +-- CreateEnum +CREATE TYPE "public"."TaskRunExecutionStatus" AS ENUM ('RUN_CREATED', 'DELAYED', 'QUEUED', 'QUEUED_EXECUTING', 'PENDING_EXECUTING', 'EXECUTING', 'EXECUTING_WITH_WAITPOINTS', 'SUSPENDED', 'PENDING_CANCEL', 'FINISHED'); + +-- CreateEnum +CREATE TYPE "public"."TaskRunCheckpointType" AS ENUM ('DOCKER', 'KUBERNETES', 'COMPUTE'); + +-- CreateEnum +CREATE TYPE "public"."WaitpointType" AS ENUM ('RUN', 'DATETIME', 'MANUAL', 'BATCH'); + +-- CreateEnum +CREATE TYPE "public"."WaitpointStatus" AS ENUM ('PENDING', 'COMPLETED'); + +-- CreateEnum +CREATE TYPE "public"."TaskRunAttemptStatus" AS ENUM ('PENDING', 'EXECUTING', 'PAUSED', 'FAILED', 'CANCELED', 'COMPLETED'); + +-- CreateEnum +CREATE TYPE "public"."BatchTaskRunStatus" AS ENUM ('PENDING', 'PROCESSING', 'COMPLETED', 'PARTIAL_FAILED', 'ABORTED'); + +-- CreateEnum +CREATE TYPE "public"."BatchTaskRunItemStatus" AS ENUM ('PENDING', 'FAILED', 'CANCELED', 'COMPLETED'); + +-- CreateEnum +CREATE TYPE "public"."CheckpointType" AS ENUM ('DOCKER', 'KUBERNETES'); + +-- CreateEnum +CREATE TYPE "public"."CheckpointRestoreEventType" AS ENUM ('CHECKPOINT', 'RESTORE'); + +-- CreateTable +CREATE TABLE "public"."TaskRun" ( + "id" TEXT NOT NULL, + "number" INTEGER NOT NULL DEFAULT 0, + "friendlyId" TEXT NOT NULL, + "engine" "public"."RunEngineVersion" NOT NULL DEFAULT 'V1', + "status" "public"."TaskRunStatus" NOT NULL DEFAULT 'PENDING', + "statusReason" TEXT, + "idempotencyKey" TEXT, + "idempotencyKeyExpiresAt" TIMESTAMP(3), + "idempotencyKeyOptions" JSONB, + "debounce" JSONB, + "taskIdentifier" TEXT NOT NULL, + "isTest" BOOLEAN NOT NULL DEFAULT false, + "payload" TEXT NOT NULL, + "payloadType" TEXT NOT NULL DEFAULT 'application/json', + "context" JSONB, + "traceContext" JSONB, + "traceId" TEXT NOT NULL, + "spanId" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "environmentType" "public"."RuntimeEnvironmentType", + "projectId" TEXT NOT NULL, + "organizationId" TEXT, + "queue" TEXT NOT NULL, + "lockedQueueId" TEXT, + "masterQueue" TEXT NOT NULL DEFAULT 'main', + "region" TEXT, + "secondaryMasterQueue" TEXT, + "attemptNumber" INTEGER, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "runTags" TEXT[], + "taskVersion" TEXT, + "sdkVersion" TEXT, + "cliVersion" TEXT, + "startedAt" TIMESTAMP(3), + "executedAt" TIMESTAMP(3), + "completedAt" TIMESTAMP(3), + "machinePreset" TEXT, + "usageDurationMs" INTEGER NOT NULL DEFAULT 0, + "costInCents" DOUBLE PRECISION NOT NULL DEFAULT 0, + "baseCostInCents" DOUBLE PRECISION NOT NULL DEFAULT 0, + "lockedAt" TIMESTAMP(3), + "lockedById" TEXT, + "lockedToVersionId" TEXT, + "priorityMs" INTEGER NOT NULL DEFAULT 0, + "concurrencyKey" TEXT, + "delayUntil" TIMESTAMP(3), + "queuedAt" TIMESTAMP(3), + "ttl" TEXT, + "expiredAt" TIMESTAMP(3), + "maxAttempts" INTEGER, + "lockedRetryConfig" JSONB, + "oneTimeUseToken" TEXT, + "taskEventStore" TEXT NOT NULL DEFAULT 'taskEvent', + "queueTimestamp" TIMESTAMP(3), + "scheduleInstanceId" TEXT, + "scheduleId" TEXT, + "bulkActionGroupIds" TEXT[] DEFAULT ARRAY[]::TEXT[], + "logsDeletedAt" TIMESTAMP(3), + "replayedFromTaskRunFriendlyId" TEXT, + "rootTaskRunId" TEXT, + "parentTaskRunId" TEXT, + "parentTaskRunAttemptId" TEXT, + "batchId" TEXT, + "resumeParentOnCompletion" BOOLEAN NOT NULL DEFAULT false, + "depth" INTEGER NOT NULL DEFAULT 0, + "parentSpanId" TEXT, + "runChainState" JSONB, + "seedMetadata" TEXT, + "seedMetadataType" TEXT NOT NULL DEFAULT 'application/json', + "metadata" TEXT, + "metadataType" TEXT NOT NULL DEFAULT 'application/json', + "metadataVersion" INTEGER NOT NULL DEFAULT 1, + "annotations" JSONB, + "isWarmStart" BOOLEAN, + "output" TEXT, + "outputType" TEXT NOT NULL DEFAULT 'application/json', + "error" JSONB, + "planType" TEXT, + "maxDurationInSeconds" INTEGER, + "realtimeStreamsVersion" TEXT NOT NULL DEFAULT 'v1', + "realtimeStreams" TEXT[] DEFAULT ARRAY[]::TEXT[], + "streamBasinName" TEXT, + + CONSTRAINT "TaskRun_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."TaskRunExecutionSnapshot" ( + "id" TEXT NOT NULL, + "engine" "public"."RunEngineVersion" NOT NULL DEFAULT 'V2', + "executionStatus" "public"."TaskRunExecutionStatus" NOT NULL, + "description" TEXT NOT NULL, + "isValid" BOOLEAN NOT NULL DEFAULT true, + "error" TEXT, + "previousSnapshotId" TEXT, + "runId" TEXT NOT NULL, + "runStatus" "public"."TaskRunStatus" NOT NULL, + "batchId" TEXT, + "attemptNumber" INTEGER, + "environmentId" TEXT NOT NULL, + "environmentType" "public"."RuntimeEnvironmentType" NOT NULL, + "projectId" TEXT NOT NULL, + "organizationId" TEXT NOT NULL, + "completedWaitpointOrder" TEXT[], + "checkpointId" TEXT, + "workerId" TEXT, + "runnerId" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "lastHeartbeatAt" TIMESTAMP(3), + "metadata" JSONB, + + CONSTRAINT "TaskRunExecutionSnapshot_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."TaskRunCheckpoint" ( + "id" TEXT NOT NULL, + "friendlyId" TEXT NOT NULL, + "type" "public"."TaskRunCheckpointType" NOT NULL, + "location" TEXT NOT NULL, + "imageRef" TEXT, + "reason" TEXT, + "metadata" TEXT, + "projectId" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "TaskRunCheckpoint_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."Waitpoint" ( + "id" TEXT NOT NULL, + "friendlyId" TEXT NOT NULL, + "type" "public"."WaitpointType" NOT NULL, + "status" "public"."WaitpointStatus" NOT NULL DEFAULT 'PENDING', + "completedAt" TIMESTAMP(3), + "idempotencyKey" TEXT NOT NULL, + "userProvidedIdempotencyKey" BOOLEAN NOT NULL, + "idempotencyKeyExpiresAt" TIMESTAMP(3), + "inactiveIdempotencyKey" TEXT, + "completedByTaskRunId" TEXT, + "completedAfter" TIMESTAMP(3), + "completedByBatchId" TEXT, + "output" TEXT, + "outputType" TEXT NOT NULL DEFAULT 'application/json', + "outputIsError" BOOLEAN NOT NULL DEFAULT false, + "projectId" TEXT NOT NULL, + "environmentId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "tags" TEXT[], + + CONSTRAINT "Waitpoint_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."TaskRunWaitpoint" ( + "id" TEXT NOT NULL, + "taskRunId" TEXT NOT NULL, + "waitpointId" TEXT NOT NULL, + "projectId" TEXT NOT NULL, + "spanIdToComplete" TEXT, + "batchId" TEXT, + "batchIndex" INTEGER, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "TaskRunWaitpoint_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."WaitpointRunConnection" ( + "id" TEXT NOT NULL, + "taskRunId" TEXT NOT NULL, + "waitpointId" TEXT NOT NULL, + + CONSTRAINT "WaitpointRunConnection_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."CompletedWaitpoint" ( + "id" TEXT NOT NULL, + "snapshotId" TEXT NOT NULL, + "waitpointId" TEXT NOT NULL, + + CONSTRAINT "CompletedWaitpoint_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."WaitpointTag" ( + "id" TEXT NOT NULL, + "name" TEXT NOT NULL, + "environmentId" TEXT NOT NULL, + "projectId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "WaitpointTag_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."TaskRunTag" ( + "id" TEXT NOT NULL, + "name" TEXT NOT NULL, + "friendlyId" TEXT NOT NULL, + "projectId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "TaskRunTag_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."TaskRunDependency" ( + "id" TEXT NOT NULL, + "taskRunId" TEXT NOT NULL, + "checkpointEventId" TEXT, + "dependentAttemptId" TEXT, + "dependentBatchRunId" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "resumedAt" TIMESTAMP(3), + + CONSTRAINT "TaskRunDependency_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."TaskRunAttempt" ( + "id" TEXT NOT NULL, + "number" INTEGER NOT NULL DEFAULT 0, + "friendlyId" TEXT NOT NULL, + "taskRunId" TEXT NOT NULL, + "backgroundWorkerId" TEXT NOT NULL, + "backgroundWorkerTaskId" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "queueId" TEXT NOT NULL, + "status" "public"."TaskRunAttemptStatus" NOT NULL DEFAULT 'PENDING', + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "startedAt" TIMESTAMP(3), + "completedAt" TIMESTAMP(3), + "usageDurationMs" INTEGER NOT NULL DEFAULT 0, + "error" JSONB, + "output" TEXT, + "outputType" TEXT NOT NULL DEFAULT 'application/json', + + CONSTRAINT "TaskRunAttempt_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."BatchTaskRun" ( + "id" TEXT NOT NULL, + "friendlyId" TEXT NOT NULL, + "idempotencyKey" TEXT, + "idempotencyKeyExpiresAt" TIMESTAMP(3), + "status" "public"."BatchTaskRunStatus" NOT NULL DEFAULT 'PENDING', + "runtimeEnvironmentId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "runIds" TEXT[] DEFAULT ARRAY[]::TEXT[], + "runCount" INTEGER NOT NULL DEFAULT 0, + "payload" TEXT, + "payloadType" TEXT NOT NULL DEFAULT 'application/json', + "options" JSONB, + "batchVersion" TEXT NOT NULL DEFAULT 'v1', + "sealed" BOOLEAN NOT NULL DEFAULT false, + "sealedAt" TIMESTAMP(3), + "expectedCount" INTEGER NOT NULL DEFAULT 0, + "completedCount" INTEGER NOT NULL DEFAULT 0, + "completedAt" TIMESTAMP(3), + "resumedAt" TIMESTAMP(3), + "processingJobsCount" INTEGER NOT NULL DEFAULT 0, + "processingJobsExpectedCount" INTEGER NOT NULL DEFAULT 0, + "oneTimeUseToken" TEXT, + "processingStartedAt" TIMESTAMP(3), + "processingCompletedAt" TIMESTAMP(3), + "successfulRunCount" INTEGER, + "failedRunCount" INTEGER, + "taskIdentifier" TEXT, + "checkpointEventId" TEXT, + "dependentTaskAttemptId" TEXT, + + CONSTRAINT "BatchTaskRun_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."BatchTaskRunItem" ( + "id" TEXT NOT NULL, + "status" "public"."BatchTaskRunItemStatus" NOT NULL DEFAULT 'PENDING', + "batchTaskRunId" TEXT NOT NULL, + "taskRunId" TEXT NOT NULL, + "taskRunAttemptId" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "completedAt" TIMESTAMP(3), + + CONSTRAINT "BatchTaskRunItem_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."BatchTaskRunError" ( + "id" TEXT NOT NULL, + "batchTaskRunId" TEXT NOT NULL, + "index" INTEGER NOT NULL, + "taskIdentifier" TEXT NOT NULL, + "payload" TEXT, + "options" JSONB, + "error" TEXT NOT NULL, + "errorCode" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "BatchTaskRunError_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."Checkpoint" ( + "id" TEXT NOT NULL, + "friendlyId" TEXT NOT NULL, + "type" "public"."CheckpointType" NOT NULL, + "location" TEXT NOT NULL, + "imageRef" TEXT NOT NULL, + "reason" TEXT, + "metadata" TEXT, + "runId" TEXT NOT NULL, + "attemptId" TEXT NOT NULL, + "attemptNumber" INTEGER, + "projectId" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "Checkpoint_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."CheckpointRestoreEvent" ( + "id" TEXT NOT NULL, + "type" "public"."CheckpointRestoreEventType" NOT NULL, + "reason" TEXT, + "metadata" TEXT, + "checkpointId" TEXT NOT NULL, + "runId" TEXT NOT NULL, + "attemptId" TEXT NOT NULL, + "projectId" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "CheckpointRestoreEvent_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRun_friendlyId_key" ON "public"."TaskRun"("friendlyId"); + +-- CreateIndex +CREATE INDEX "TaskRun_parentTaskRunId_idx" ON "public"."TaskRun"("parentTaskRunId"); + +-- CreateIndex +CREATE INDEX "TaskRun_spanId_idx" ON "public"."TaskRun"("spanId"); + +-- CreateIndex +CREATE INDEX "TaskRun_parentSpanId_idx" ON "public"."TaskRun"("parentSpanId"); + +-- CreateIndex +CREATE INDEX "TaskRun_runTags_idx" ON "public"."TaskRun" USING GIN ("runTags" array_ops); + +-- CreateIndex +CREATE INDEX "TaskRun_runtimeEnvironmentId_batchId_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "batchId"); + +-- CreateIndex +CREATE INDEX "TaskRun_runtimeEnvironmentId_createdAt_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "createdAt" DESC); + +-- CreateIndex +CREATE INDEX "TaskRun_createdAt_idx" ON "public"."TaskRun" USING BRIN ("createdAt"); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRun_oneTimeUseToken_key" ON "public"."TaskRun"("oneTimeUseToken"); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRun_runtimeEnvironmentId_taskIdentifier_idempotencyKey_key" ON "public"."TaskRun"("runtimeEnvironmentId", "taskIdentifier", "idempotencyKey"); + +-- CreateIndex +CREATE INDEX "TaskRunExecutionSnapshot_runId_isValid_createdAt_idx" ON "public"."TaskRunExecutionSnapshot"("runId", "isValid", "createdAt" DESC); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRunCheckpoint_friendlyId_key" ON "public"."TaskRunCheckpoint"("friendlyId"); + +-- CreateIndex +CREATE UNIQUE INDEX "Waitpoint_friendlyId_key" ON "public"."Waitpoint"("friendlyId"); + +-- CreateIndex +CREATE UNIQUE INDEX "Waitpoint_completedByTaskRunId_key" ON "public"."Waitpoint"("completedByTaskRunId"); + +-- CreateIndex +CREATE INDEX "Waitpoint_completedByBatchId_idx" ON "public"."Waitpoint"("completedByBatchId"); + +-- CreateIndex +CREATE INDEX "Waitpoint_environmentId_type_createdAt_idx" ON "public"."Waitpoint"("environmentId", "type", "createdAt" DESC); + +-- CreateIndex +CREATE INDEX "Waitpoint_environmentId_type_status_idx" ON "public"."Waitpoint"("environmentId", "type", "status"); + +-- CreateIndex +CREATE INDEX "Waitpoint_environmentId_type_id_idx" ON "public"."Waitpoint"("environmentId", "type", "id" DESC); + +-- CreateIndex +CREATE UNIQUE INDEX "Waitpoint_environmentId_idempotencyKey_key" ON "public"."Waitpoint"("environmentId", "idempotencyKey"); + +-- CreateIndex +CREATE INDEX "TaskRunWaitpoint_taskRunId_idx" ON "public"."TaskRunWaitpoint"("taskRunId"); + +-- CreateIndex +CREATE INDEX "TaskRunWaitpoint_waitpointId_idx" ON "public"."TaskRunWaitpoint"("waitpointId"); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRunWaitpoint_taskRunId_waitpointId_batchIndex_key" ON "public"."TaskRunWaitpoint"("taskRunId", "waitpointId", "batchIndex"); + +-- CreateIndex +CREATE INDEX "WaitpointRunConnection_taskRunId_idx" ON "public"."WaitpointRunConnection"("taskRunId"); + +-- CreateIndex +CREATE INDEX "WaitpointRunConnection_waitpointId_idx" ON "public"."WaitpointRunConnection"("waitpointId"); + +-- CreateIndex +CREATE UNIQUE INDEX "WaitpointRunConnection_taskRunId_waitpointId_key" ON "public"."WaitpointRunConnection"("taskRunId", "waitpointId"); + +-- CreateIndex +CREATE INDEX "CompletedWaitpoint_snapshotId_idx" ON "public"."CompletedWaitpoint"("snapshotId"); + +-- CreateIndex +CREATE INDEX "CompletedWaitpoint_waitpointId_idx" ON "public"."CompletedWaitpoint"("waitpointId"); + +-- CreateIndex +CREATE UNIQUE INDEX "CompletedWaitpoint_snapshotId_waitpointId_key" ON "public"."CompletedWaitpoint"("snapshotId", "waitpointId"); + +-- CreateIndex +CREATE UNIQUE INDEX "WaitpointTag_environmentId_name_key" ON "public"."WaitpointTag"("environmentId", "name"); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRunTag_friendlyId_key" ON "public"."TaskRunTag"("friendlyId"); + +-- CreateIndex +CREATE INDEX "TaskRunTag_name_id_idx" ON "public"."TaskRunTag"("name", "id"); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRunTag_projectId_name_key" ON "public"."TaskRunTag"("projectId", "name"); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRunDependency_taskRunId_key" ON "public"."TaskRunDependency"("taskRunId"); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRunDependency_checkpointEventId_key" ON "public"."TaskRunDependency"("checkpointEventId"); + +-- CreateIndex +CREATE INDEX "TaskRunDependency_dependentAttemptId_idx" ON "public"."TaskRunDependency"("dependentAttemptId"); + +-- CreateIndex +CREATE INDEX "TaskRunDependency_dependentBatchRunId_idx" ON "public"."TaskRunDependency"("dependentBatchRunId"); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRunAttempt_friendlyId_key" ON "public"."TaskRunAttempt"("friendlyId"); + +-- CreateIndex +CREATE INDEX "TaskRunAttempt_taskRunId_idx" ON "public"."TaskRunAttempt"("taskRunId"); + +-- CreateIndex +CREATE UNIQUE INDEX "TaskRunAttempt_taskRunId_number_key" ON "public"."TaskRunAttempt"("taskRunId", "number"); + +-- CreateIndex +CREATE UNIQUE INDEX "BatchTaskRun_friendlyId_key" ON "public"."BatchTaskRun"("friendlyId"); + +-- CreateIndex +CREATE UNIQUE INDEX "BatchTaskRun_checkpointEventId_key" ON "public"."BatchTaskRun"("checkpointEventId"); + +-- CreateIndex +CREATE INDEX "BatchTaskRun_dependentTaskAttemptId_idx" ON "public"."BatchTaskRun"("dependentTaskAttemptId"); + +-- CreateIndex +CREATE INDEX "BatchTaskRun_runtimeEnvironmentId_id_idx" ON "public"."BatchTaskRun"("runtimeEnvironmentId", "id" DESC); + +-- CreateIndex +CREATE UNIQUE INDEX "BatchTaskRun_oneTimeUseToken_key" ON "public"."BatchTaskRun"("oneTimeUseToken"); + +-- CreateIndex +CREATE UNIQUE INDEX "BatchTaskRun_runtimeEnvironmentId_idempotencyKey_key" ON "public"."BatchTaskRun"("runtimeEnvironmentId", "idempotencyKey"); + +-- CreateIndex +CREATE INDEX "idx_batchtaskrunitem_taskrunattempt" ON "public"."BatchTaskRunItem"("taskRunAttemptId"); + +-- CreateIndex +CREATE INDEX "idx_batchtaskrunitem_taskrun" ON "public"."BatchTaskRunItem"("taskRunId"); + +-- CreateIndex +CREATE UNIQUE INDEX "BatchTaskRunItem_batchTaskRunId_taskRunId_key" ON "public"."BatchTaskRunItem"("batchTaskRunId", "taskRunId"); + +-- CreateIndex +CREATE INDEX "BatchTaskRunError_batchTaskRunId_idx" ON "public"."BatchTaskRunError"("batchTaskRunId"); + +-- CreateIndex +CREATE UNIQUE INDEX "BatchTaskRunError_batchTaskRunId_index_key" ON "public"."BatchTaskRunError"("batchTaskRunId", "index"); + +-- CreateIndex +CREATE UNIQUE INDEX "Checkpoint_friendlyId_key" ON "public"."Checkpoint"("friendlyId"); + +-- CreateIndex +CREATE INDEX "Checkpoint_attemptId_idx" ON "public"."Checkpoint"("attemptId"); + +-- CreateIndex +CREATE INDEX "Checkpoint_runId_idx" ON "public"."Checkpoint"("runId"); + +-- CreateIndex +CREATE INDEX "CheckpointRestoreEvent_checkpointId_idx" ON "public"."CheckpointRestoreEvent"("checkpointId"); + +-- CreateIndex +CREATE INDEX "CheckpointRestoreEvent_runId_idx" ON "public"."CheckpointRestoreEvent"("runId"); + +-- AddForeignKey +ALTER TABLE "public"."TaskRun" ADD CONSTRAINT "TaskRun_rootTaskRunId_fkey" FOREIGN KEY ("rootTaskRunId") REFERENCES "public"."TaskRun"("id") ON DELETE SET NULL ON UPDATE NO ACTION; + +-- AddForeignKey +ALTER TABLE "public"."TaskRun" ADD CONSTRAINT "TaskRun_parentTaskRunId_fkey" FOREIGN KEY ("parentTaskRunId") REFERENCES "public"."TaskRun"("id") ON DELETE SET NULL ON UPDATE NO ACTION; + +-- AddForeignKey +ALTER TABLE "public"."TaskRun" ADD CONSTRAINT "TaskRun_parentTaskRunAttemptId_fkey" FOREIGN KEY ("parentTaskRunAttemptId") REFERENCES "public"."TaskRunAttempt"("id") ON DELETE SET NULL ON UPDATE NO ACTION; + +-- AddForeignKey +ALTER TABLE "public"."TaskRun" ADD CONSTRAINT "TaskRun_batchId_fkey" FOREIGN KEY ("batchId") REFERENCES "public"."BatchTaskRun"("id") ON DELETE SET NULL ON UPDATE NO ACTION; + +-- AddForeignKey +ALTER TABLE "public"."TaskRunExecutionSnapshot" ADD CONSTRAINT "TaskRunExecutionSnapshot_runId_fkey" FOREIGN KEY ("runId") REFERENCES "public"."TaskRun"("id") ON DELETE RESTRICT ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."TaskRunExecutionSnapshot" ADD CONSTRAINT "TaskRunExecutionSnapshot_checkpointId_fkey" FOREIGN KEY ("checkpointId") REFERENCES "public"."TaskRunCheckpoint"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."TaskRunDependency" ADD CONSTRAINT "TaskRunDependency_taskRunId_fkey" FOREIGN KEY ("taskRunId") REFERENCES "public"."TaskRun"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."TaskRunDependency" ADD CONSTRAINT "TaskRunDependency_checkpointEventId_fkey" FOREIGN KEY ("checkpointEventId") REFERENCES "public"."CheckpointRestoreEvent"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."TaskRunDependency" ADD CONSTRAINT "TaskRunDependency_dependentAttemptId_fkey" FOREIGN KEY ("dependentAttemptId") REFERENCES "public"."TaskRunAttempt"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."TaskRunDependency" ADD CONSTRAINT "TaskRunDependency_dependentBatchRunId_fkey" FOREIGN KEY ("dependentBatchRunId") REFERENCES "public"."BatchTaskRun"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."TaskRunAttempt" ADD CONSTRAINT "TaskRunAttempt_taskRunId_fkey" FOREIGN KEY ("taskRunId") REFERENCES "public"."TaskRun"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."BatchTaskRun" ADD CONSTRAINT "BatchTaskRun_checkpointEventId_fkey" FOREIGN KEY ("checkpointEventId") REFERENCES "public"."CheckpointRestoreEvent"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."BatchTaskRun" ADD CONSTRAINT "BatchTaskRun_dependentTaskAttemptId_fkey" FOREIGN KEY ("dependentTaskAttemptId") REFERENCES "public"."TaskRunAttempt"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."BatchTaskRunItem" ADD CONSTRAINT "BatchTaskRunItem_batchTaskRunId_fkey" FOREIGN KEY ("batchTaskRunId") REFERENCES "public"."BatchTaskRun"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."BatchTaskRunItem" ADD CONSTRAINT "BatchTaskRunItem_taskRunId_fkey" FOREIGN KEY ("taskRunId") REFERENCES "public"."TaskRun"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."BatchTaskRunItem" ADD CONSTRAINT "BatchTaskRunItem_taskRunAttemptId_fkey" FOREIGN KEY ("taskRunAttemptId") REFERENCES "public"."TaskRunAttempt"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."BatchTaskRunError" ADD CONSTRAINT "BatchTaskRunError_batchTaskRunId_fkey" FOREIGN KEY ("batchTaskRunId") REFERENCES "public"."BatchTaskRun"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."Checkpoint" ADD CONSTRAINT "Checkpoint_runId_fkey" FOREIGN KEY ("runId") REFERENCES "public"."TaskRun"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."Checkpoint" ADD CONSTRAINT "Checkpoint_attemptId_fkey" FOREIGN KEY ("attemptId") REFERENCES "public"."TaskRunAttempt"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."CheckpointRestoreEvent" ADD CONSTRAINT "CheckpointRestoreEvent_checkpointId_fkey" FOREIGN KEY ("checkpointId") REFERENCES "public"."Checkpoint"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."CheckpointRestoreEvent" ADD CONSTRAINT "CheckpointRestoreEvent_runId_fkey" FOREIGN KEY ("runId") REFERENCES "public"."TaskRun"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."CheckpointRestoreEvent" ADD CONSTRAINT "CheckpointRestoreEvent_attemptId_fkey" FOREIGN KEY ("attemptId") REFERENCES "public"."TaskRunAttempt"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/internal-packages/run-ops-database/prisma/migrations/20260630000000_task_run_waitpoint_null_batchindex_partial_unique/migration.sql b/internal-packages/run-ops-database/prisma/migrations/20260630000000_task_run_waitpoint_null_batchindex_partial_unique/migration.sql new file mode 100644 index 00000000000..062f07068be --- /dev/null +++ b/internal-packages/run-ops-database/prisma/migrations/20260630000000_task_run_waitpoint_null_batchindex_partial_unique/migration.sql @@ -0,0 +1,5 @@ +-- Partial unique index (Prisma cannot express the WHERE clause, so it is SQL-only). Mirrors the +-- control-plane `TaskRunWaitpoint_taskRunId_waitpointId_batchIndex_null_key`: without it the +-- composite (taskRunId, waitpointId, batchIndex) index treats NULL batchIndex rows as distinct, so +-- blockRunWithWaitpointEdges' ON CONFLICT DO NOTHING cannot dedupe a re-blocked NULL-batchIndex edge. +CREATE UNIQUE INDEX "TaskRunWaitpoint_taskRunId_waitpointId_batchIndex_null_key" ON "public"."TaskRunWaitpoint"("taskRunId", "waitpointId") WHERE "batchIndex" IS NULL; diff --git a/internal-packages/run-ops-database/prisma/migrations/migration_lock.toml b/internal-packages/run-ops-database/prisma/migrations/migration_lock.toml new file mode 100644 index 00000000000..044d57cdb0d --- /dev/null +++ b/internal-packages/run-ops-database/prisma/migrations/migration_lock.toml @@ -0,0 +1,3 @@ +# Please do not edit this file manually +# It should be added in your version-control system (e.g., Git) +provider = "postgresql" diff --git a/internal-packages/run-ops-database/prisma/schema.parity.test.ts b/internal-packages/run-ops-database/prisma/schema.parity.test.ts new file mode 100644 index 00000000000..920def9813f --- /dev/null +++ b/internal-packages/run-ops-database/prisma/schema.parity.test.ts @@ -0,0 +1,85 @@ +import { describe, expect, it } from "vitest"; +import { readFileSync } from "node:fs"; +import { resolve } from "node:path"; + +// Asserts every scalar column of the run-subgraph models in the control-plane +// schema also exists in the dedicated schema (so the dedicated DB can hold the +// same row shape), and that the dedicated schema contains NO reference to a +// control-plane model name. +const CONTROL_PLANE_MODELS = [ + "Organization", + "OrgMember", + "Project", + "RuntimeEnvironment", + "User", + "TaskSchedule", + "BackgroundWorker", + "BackgroundWorkerTask", + "WorkerDeployment", + "TaskQueue", +]; + +function readSchema(rel: string) { + return readFileSync(resolve(__dirname, rel), "utf8"); +} + +describe("dedicated run-ops schema parity", () => { + it("references no control-plane model as a relation target", () => { + const dedicated = readSchema("./schema.prisma"); + for (const model of CONTROL_PLANE_MODELS) { + // A relation target appears as ` fieldName Model @relation(...)`. A bare + // scalar column like `projectId String` is fine; the model TYPE must be absent. + const relationTarget = new RegExp( + `@relation[^\\n]*\\b${model}\\b|\\b${model}\\b[^\\n]*@relation` + ); + expect(dedicated).not.toMatch(relationTarget); + expect(dedicated).not.toMatch(new RegExp(`\\s${model}(\\?|\\[\\])?\\s`)); + } + }); + + it("includes all 14 run-subgraph models", () => { + const dedicated = readSchema("./schema.prisma"); + for (const m of [ + "TaskRun", + "TaskRunAttempt", + "TaskRunExecutionSnapshot", + "TaskRunWaitpoint", + "TaskRunCheckpoint", + "CheckpointRestoreEvent", + "TaskRunTag", + "Waitpoint", + "WaitpointTag", + "BatchTaskRun", + "TaskRunDependency", + "BatchTaskRunItem", + "BatchTaskRunError", + "Checkpoint", + ]) { + expect(dedicated).toMatch(new RegExp(`model ${m} \\{`)); + } + }); + + it("keeps the group-(A) waitpoint-block references FK-FREE (scalar columns / explicit FK-free join models)", () => { + const dedicated = readSchema("./schema.prisma"); + // TaskRunWaitpoint must NOT carry a `@relation` to Waitpoint/TaskRun/BatchTaskRun. + const trw = dedicated.match(/model TaskRunWaitpoint \{[\s\S]*?\n\}/)![0]; + expect(trw).not.toMatch(/@relation/); + expect(trw).toMatch(/waitpointId\s+String/); + expect(trw).toMatch(/taskRunId\s+String/); + // The two implicit M2M sets are replaced by explicit FK-free join models. + expect(dedicated).toMatch(/model WaitpointRunConnection \{/); + expect(dedicated).toMatch(/model CompletedWaitpoint \{/); + const wrc = dedicated.match(/model WaitpointRunConnection \{[\s\S]*?\n\}/)![0]; + expect(wrc).not.toMatch(/@relation/); + // Waitpoint completion back-refs are scalar, not relations. + const wp = dedicated.match(/model Waitpoint \{[\s\S]*?\n\}/)![0]; + expect(wp).not.toMatch(/completedByTaskRun\s+TaskRun\s*\?\s*@relation/); + }); + + it("keeps the group-(B) co-resident references as real FKs (e.g. TaskRunAttempt.taskRun)", () => { + const dedicated = readSchema("./schema.prisma"); + const attempt = dedicated.match(/model TaskRunAttempt \{[\s\S]*?\n\}/)![0]; + // The attempt->run relation stays a real FK (always co-resident). + expect(attempt).toMatch(/taskRun\s+TaskRun\s+@relation/); + }); +}); diff --git a/internal-packages/run-ops-database/prisma/schema.prisma b/internal-packages/run-ops-database/prisma/schema.prisma new file mode 100644 index 00000000000..2d749f960bf --- /dev/null +++ b/internal-packages/run-ops-database/prisma/schema.prisma @@ -0,0 +1,928 @@ +datasource db { + provider = "postgresql" + url = env("RUN_OPS_DATABASE_URL") + directUrl = env("RUN_OPS_DATABASE_DIRECT_URL") +} + +generator client { + provider = "prisma-client-js" + output = "../generated/run-ops" + binaryTargets = ["native", "debian-openssl-1.1.x"] + previewFeatures = ["metrics"] +} + +// +// ───────────────────────────────────────────────────────────────────────────── +// Dedicated run-ops subgraph schema (the NEW DB). +// +// This is a copied SUBSET of the control-plane `@trigger.dev/database` schema, +// reproduced verbatim minus: +// 1. control-plane `@relation`s (org/project/environment/worker/queue) — +// demoted to plain scalar FK columns, no `@relation`, no FK. +// 2. group-(A) cross-DB-capable waitpoint-block `@relation`s — demoted to plain +// scalar columns and explicit FK-free join models (a block edge can straddle +// the two DBs, so it MUST NOT be enforced by an FK on the NEW DB). +// group-(B) co-resident run-ops→run-ops relations are KEPT as real FKs. +// ───────────────────────────────────────────────────────────────────────────── +// + +enum RuntimeEnvironmentType { + PRODUCTION + STAGING + DEVELOPMENT + PREVIEW +} + +model TaskRun { + id String @id @default(cuid()) + + number Int @default(0) + friendlyId String @unique + + engine RunEngineVersion @default(V1) + + status TaskRunStatus @default(PENDING) + statusReason String? + + idempotencyKey String? + idempotencyKeyExpiresAt DateTime? + /// Stores the user-provided key and scope: { key: string, scope: "run" | "attempt" | "global" } + idempotencyKeyOptions Json? + + /// Debounce options: { key: string, delay: string, createdAt: Date } + debounce Json? + + taskIdentifier String + + isTest Boolean @default(false) + + payload String + payloadType String @default("application/json") + context Json? + traceContext Json? + + traceId String + spanId String + + // scalarized control-plane FK (was `runtimeEnvironment @relation`) + runtimeEnvironmentId String + + environmentType RuntimeEnvironmentType? + + // scalarized control-plane FK (was `project @relation`) + projectId String + + organizationId String? + + // The specific queue this run is in + queue String + // The queueId is set when the run is locked to a specific queue + lockedQueueId String? + + /// The main queue that this run is part of + workerQueue String @default("main") @map("masterQueue") + + /// User-facing geo region, stamped at trigger; workerQueue is where it actually ran. + region String? + + /// @deprecated + secondaryMasterQueue String? + + /// From engine v2+ this will be defined after a run has been dequeued (starting at 1) + attemptNumber Int? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + attempts TaskRunAttempt[] @relation("attempts") + + /// Denormized column that holds the raw tags + runTags String[] + + /// Denormalized version of the background worker task + taskVersion String? + sdkVersion String? + cliVersion String? + + checkpoints Checkpoint[] + + /// startedAt marks the point at which a run is dequeued from MarQS + startedAt DateTime? + /// executedAt is set when the first attempt is about to execute + executedAt DateTime? + completedAt DateTime? + machinePreset String? + + usageDurationMs Int @default(0) + costInCents Float @default(0) + baseCostInCents Float @default(0) + + lockedAt DateTime? + // scalarized control-plane FK (was `lockedBy @relation`) + lockedById String? + + // scalarized control-plane FK (was `lockedToVersion @relation`) + lockedToVersionId String? + + /// The "priority" of the run. This is just a negative offset in ms for the queue timestamp + /// E.g. a value of 60_000 would put the run into the queue 60s ago. + priorityMs Int @default(0) + + concurrencyKey String? + + delayUntil DateTime? + queuedAt DateTime? + ttl String? + expiredAt DateTime? + maxAttempts Int? + lockedRetryConfig Json? + + /// optional token that can be used to authenticate the task run + oneTimeUseToken String? + + /// GROUP (A) FK-FREE: when this run is finished, the waitpoint will be marked as + /// completed. Was `associatedWaitpoint Waitpoint? @relation("CompletingRun")`. + /// The completion back-ref lives on `Waitpoint.completedByTaskRunId` (scalar). + + /// GROUP (A) FK-FREE: if there are any blocked waitpoints, the run won't be + /// executed. Was `blockedByWaitpoints TaskRunWaitpoint[]`. Block edges are the + /// rows of `TaskRunWaitpoint` keyed by the scalar `taskRunId`. + + /// GROUP (A) FK-FREE: all waitpoints that blocked this run at some point, used + /// for display purposes. Was the implicit M2M + /// `connectedWaitpoints Waitpoint[] @relation("WaitpointRunConnections")`, + /// now the explicit FK-free join model `WaitpointRunConnection`. + + /// Where the logs are stored + taskEventStore String @default("taskEvent") + + queueTimestamp DateTime? + + batchItems BatchTaskRunItem[] + dependency TaskRunDependency? + // Renamed back-ref kept verbatim from control plane + CheckpointRestoreEvent CheckpointRestoreEvent[] + executionSnapshots TaskRunExecutionSnapshot[] + + scheduleInstanceId String? + scheduleId String? + + bulkActionGroupIds String[] @default([]) + + logsDeletedAt DateTime? + + replayedFromTaskRunFriendlyId String? + + /// GROUP (B) KEEP: this represents the original task that was triggered outside of a Trigger.dev task + rootTaskRun TaskRun? @relation("TaskRootRun", fields: [rootTaskRunId], references: [id], onDelete: SetNull, onUpdate: NoAction) + rootTaskRunId String? + + /// The root run will have a list of all the descendant runs, children, grand children, etc. + descendantRuns TaskRun[] @relation("TaskRootRun") + + /// GROUP (B) KEEP: the immediate parent run of this task run + parentTaskRun TaskRun? @relation("TaskParentRun", fields: [parentTaskRunId], references: [id], onDelete: SetNull, onUpdate: NoAction) + parentTaskRunId String? + + /// The immediate child runs of this task run + childRuns TaskRun[] @relation("TaskParentRun") + + /// GROUP (B) KEEP: the immediate parent attempt of this task run + parentTaskRunAttempt TaskRunAttempt? @relation("TaskParentRunAttempt", fields: [parentTaskRunAttemptId], references: [id], onDelete: SetNull, onUpdate: NoAction) + parentTaskRunAttemptId String? + + /// GROUP (B) KEEP: the batch run that this task run is a part of (co-resident) + batch BatchTaskRun? @relation(fields: [batchId], references: [id], onDelete: SetNull, onUpdate: NoAction) + batchId String? + + /// whether or not the task run was created because of a triggerAndWait for batchTriggerAndWait + resumeParentOnCompletion Boolean @default(false) + + /// The depth of this task run in the task run hierarchy + depth Int @default(0) + + /// The span ID of the "trigger" span in the parent task run + parentSpanId String? + + /// Holds the state of the run chain for deadlock detection + runChainState Json? + + /// seed run metadata + seedMetadata String? + seedMetadataType String @default("application/json") + + /// Run metadata + metadata String? + metadataType String @default("application/json") + metadataVersion Int @default(1) + + /// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId + annotations Json? + + /// Whether the latest attempt was a warm start. Null until first attempt starts. + isWarmStart Boolean? + + /// Run output + output String? + outputType String @default("application/json") + + /// Run error + error Json? + + /// Organization's billing plan type (cached for fallback when billing API fails) + planType String? + + maxDurationInSeconds Int? + + /// The version of the realtime streams implementation used by the run + realtimeStreamsVersion String @default("v1") + /// Store the stream keys that are being used by the run + realtimeStreams String[] @default([]) + /// S2 basin where this run's realtime streams live. Stamped at create + /// time from `Organization.streamBasinName` so reads can resolve the + /// basin without joining org. Null when the org has no per-org basin + /// (OSS, or pre-backfill); reads fall back to the global basin. + streamBasinName String? + + @@unique([oneTimeUseToken]) + @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) + // Finding child runs + @@index([parentTaskRunId]) + // Run page inspector + @@index([spanId]) + @@index([parentSpanId]) + // Finding runs in a batch + @@index([runTags(ops: ArrayOps)], type: Gin) + @@index([runtimeEnvironmentId, batchId]) + @@index([runtimeEnvironmentId, createdAt(sort: Desc)]) + @@index([createdAt], type: Brin) +} + +enum TaskRunStatus { + /// + /// NON-FINAL STATUSES + /// + + /// Task has been scheduled to run in the future + DELAYED + /// Task is waiting to be executed by a worker + PENDING + + /// The run is pending a version update because it cannot execute without additional information (task, queue, etc.). Replaces WAITING_FOR_DEPLOY + PENDING_VERSION + + /// Task hasn't been deployed yet but is waiting to be executed. Deprecated in favor of PENDING_VERSION + WAITING_FOR_DEPLOY + + /// Task has been dequeued from the queue but is not yet executing + DEQUEUED + + /// Task is currently being executed by a worker + EXECUTING + + /// Task has been paused by the system, and will be resumed by the system + WAITING_TO_RESUME + + /// Task has failed and is waiting to be retried + RETRYING_AFTER_FAILURE + + /// Task has been paused by the user, and can be resumed by the user + PAUSED + + /// + /// FINAL STATUSES + /// + + /// Task has been canceled by the user + CANCELED + + /// Task was interrupted during execution, mostly this happens in development environments + INTERRUPTED + + /// Task has been completed successfully + COMPLETED_SUCCESSFULLY + + /// Task has been completed with errors + COMPLETED_WITH_ERRORS + + /// Task has failed to complete, due to an error in the system + SYSTEM_FAILURE + + /// Task has crashed and won't be retried, most likely the worker ran out of resources, e.g. memory or storage + CRASHED + + /// Task reached the ttl without being executed + EXPIRED + + /// Task has been timed out when using maxDuration + TIMED_OUT +} + +enum RunEngineVersion { + /// The original version that uses marqs v1 and Graphile + V1 + V2 +} + +/// Used by the RunEngine during TaskRun execution +/// It has the required information to transactionally progress a run through states, +/// and prevent side effects like heartbeats failing a run that has progressed. +/// It is optimised for performance and is designed to be cleared at some point, +/// so there are no cascading relationships to other models. +model TaskRunExecutionSnapshot { + id String @id @default(cuid()) + + /// This should always be 2+ (V1 didn't use the run engine or snapshots) + engine RunEngineVersion @default(V2) + + /// The execution status + executionStatus TaskRunExecutionStatus + /// For debugging + description String + + /// We store invalid snapshots as a record of the run state when we tried to move + isValid Boolean @default(true) + error String? + + /// The previous snapshot ID + previousSnapshotId String? + + /// GROUP (B) KEEP: Run (co-resident) + runId String + run TaskRun @relation(fields: [runId], references: [id]) + runStatus TaskRunStatus + + // Batch + batchId String? + + /// This is the current run attempt number. Users can define how many attempts they want for a run. + attemptNumber Int? + + /// Environment (already plain scalars in the control plane) + environmentId String + environmentType RuntimeEnvironmentType + projectId String + organizationId String + + /// GROUP (A) FK-FREE: waitpoints that have been completed for this execution. + /// Was the implicit M2M `completedWaitpoints Waitpoint[] @relation("completedWaitpoints")`, + /// now the explicit FK-free join model `CompletedWaitpoint`. + + /// An array of waitpoint IDs in the correct order, used for batches + completedWaitpointOrder String[] + + /// GROUP (B) KEEP: Checkpoint (co-resident) + checkpointId String? + checkpoint TaskRunCheckpoint? @relation(fields: [checkpointId], references: [id]) + + /// Worker + workerId String? + runnerId String? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + lastHeartbeatAt DateTime? + + /// Metadata used by various systems in the run engine + metadata Json? + + /// Used to get the latest valid snapshot quickly + @@index([runId, isValid, createdAt(sort: Desc)]) +} + +enum TaskRunExecutionStatus { + /// Run has been created + RUN_CREATED + /// Run is delayed, waiting to be enqueued + DELAYED + /// Run is in the RunQueue + QUEUED + /// Run is in the RunQueue, and is also executing. This happens when a run is continued cannot reacquire concurrency + QUEUED_EXECUTING + /// Run has been pulled from the queue, but isn't executing yet + PENDING_EXECUTING + /// Run is executing on a worker + EXECUTING + /// Run is executing on a worker but is waiting for waitpoints to complete + EXECUTING_WITH_WAITPOINTS + /// Run has been suspended and may be waiting for waitpoints to complete before resuming + SUSPENDED + /// Run has been scheduled for cancellation + PENDING_CANCEL + /// Run is finished (success of failure) + FINISHED +} + +model TaskRunCheckpoint { + id String @id @default(cuid()) + + friendlyId String @unique + + type TaskRunCheckpointType + location String + imageRef String? + reason String? + metadata String? + + // scalarized control-plane FK (was `project @relation`) + projectId String + + // scalarized control-plane FK (was `runtimeEnvironment @relation`) + runtimeEnvironmentId String + + executionSnapshot TaskRunExecutionSnapshot[] + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt +} + +enum TaskRunCheckpointType { + DOCKER + KUBERNETES + COMPUTE +} + +/// A Waitpoint blocks a run from continuing until it's completed +/// If there's a waitpoint blocking a run, it shouldn't be in the queue +model Waitpoint { + id String @id @default(cuid()) + + friendlyId String @unique + + type WaitpointType + status WaitpointStatus @default(PENDING) + + completedAt DateTime? + + /// If it's an Event type waitpoint, this is the event. It can also be provided for the DATETIME type + idempotencyKey String + /// If this is true then we can show it in the dashboard/return it from the SDK + userProvidedIdempotencyKey Boolean + + /// If there's a user provided idempotency key, this is the time it expires at + idempotencyKeyExpiresAt DateTime? + + /// If an idempotencyKey is no longer active, we store it here and generate a new one for the idempotencyKey field. + /// Clearing an idempotencyKey is useful for debounce or cancelling child runs. + /// This is a workaround because Prisma doesn't support partial indexes. + inactiveIdempotencyKey String? + + /// GROUP (A) FK-FREE: RUN-type completing run, scalar only (a LEGACY token can be completed by a NEW run). + completedByTaskRunId String? @unique + + /// If it's a DATETIME type waitpoint, this is the date. + /// If it's a MANUAL waitpoint, this can be set as the `timeout`. + completedAfter DateTime? + + /// GROUP (A) FK-FREE: BATCH-type completing batch, scalar only. + completedByBatchId String? + + // GROUP (A) FK-FREE back-refs dropped: blocking edges live in TaskRunWaitpoint; + // connectedRuns -> WaitpointRunConnection; completing snapshots -> CompletedWaitpoint. + + /// When completed, an output can be stored here + output String? + outputType String @default("application/json") + outputIsError Boolean @default(false) + + // scalarized control-plane FK (was `project @relation`) + projectId String + + // scalarized control-plane FK (was `environment @relation`) + environmentId String + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + /// Denormized column that holds the raw tags + /// Denormalized column that holds the raw tags + tags String[] + + /// Quickly find an idempotent waitpoint + @@unique([environmentId, idempotencyKey]) + /// Quickly find a batch waitpoint + @@index([completedByBatchId]) + /// Used on the Waitpoint dashboard pages + /// Time period filtering + @@index([environmentId, type, createdAt(sort: Desc)]) + /// Status filtering + @@index([environmentId, type, status]) + /// For the waitpoint token dashboard page + @@index([environmentId, type, id(sort: Desc)]) +} + +enum WaitpointType { + RUN + DATETIME + MANUAL + BATCH +} + +enum WaitpointStatus { + PENDING + COMPLETED +} + +model TaskRunWaitpoint { + id String @id @default(cuid()) + + /// GROUP (A) FK-FREE: the block edge → blocked run (was a TaskRun FK). + taskRunId String + + /// GROUP (A) FK-FREE: the block edge → its waitpoint (was a Waitpoint FK). + waitpointId String + + // scalarized control-plane FK (was a project FK) + projectId String + + /// This span id is completed when the waitpoint is completed. This is used with cached runs (idempotent) + spanIdToComplete String? + + /// GROUP (A) FK-FREE: associated batch (was a BatchTaskRun FK). + batchId String? + //if there's an associated batch and this isn't set it's for the entire batch + //if it is set, it's a specific run in the batch + batchIndex Int? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + /// There are two constraints, the one below and also one that Prisma doesn't support + /// The second one implemented in SQL only prevents a TaskRun + Waitpoint with a null batchIndex + @@unique([taskRunId, waitpointId, batchIndex]) + @@index([taskRunId]) + @@index([waitpointId]) +} + +/// GROUP (A) FK-FREE explicit join model — replaces the implicit M2M +/// `_WaitpointRunConnections` (TaskRun.connectedWaitpoints ↔ Waitpoint.connectedRuns). +/// Scalar columns only, no FK, so a run↔waitpoint connection may straddle DBs. +model WaitpointRunConnection { + id String @id @default(cuid()) + + taskRunId String + waitpointId String + + @@unique([taskRunId, waitpointId]) + @@index([taskRunId]) + @@index([waitpointId]) +} + +/// GROUP (A) FK-FREE explicit join model — replaces the implicit M2M +/// `_completedWaitpoints` (TaskRunExecutionSnapshot.completedWaitpoints ↔ Waitpoint). +/// Scalar columns only, no FK, so a NEW snapshot may record a LEGACY token. +model CompletedWaitpoint { + id String @id @default(cuid()) + + snapshotId String + waitpointId String + + @@unique([snapshotId, waitpointId]) + @@index([snapshotId]) + @@index([waitpointId]) +} + +model WaitpointTag { + id String @id @default(cuid()) + name String + + // scalarized control-plane FK (was `environment @relation`) + environmentId String + + // scalarized control-plane FK (was `project @relation`) + projectId String + + createdAt DateTime @default(now()) + + @@unique([environmentId, name]) +} + +model TaskRunTag { + id String @id @default(cuid()) + name String + + friendlyId String @unique + + // scalarized control-plane FK (was `project @relation`) + projectId String + + createdAt DateTime @default(now()) + + @@unique([projectId, name]) + //Makes run filtering by tag faster + @@index([name, id]) +} + +/// This is used for triggerAndWait and batchTriggerAndWait. The taskRun is the child task, it points at a parent attempt or a batch +model TaskRunDependency { + id String @id @default(cuid()) + + /// GROUP (B) KEEP: the child run (co-resident) + taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRunId String @unique + + /// GROUP (B) KEEP + checkpointEvent CheckpointRestoreEvent? @relation(fields: [checkpointEventId], references: [id], onDelete: Cascade, onUpdate: Cascade) + checkpointEventId String? @unique + + /// GROUP (B) KEEP: an attempt that is dependent on this task run. + dependentAttempt TaskRunAttempt? @relation(fields: [dependentAttemptId], references: [id]) + dependentAttemptId String? + + /// GROUP (B) KEEP: a batch run that is dependent on this task run + dependentBatchRun BatchTaskRun? @relation("dependentBatchRun", fields: [dependentBatchRunId], references: [id]) + dependentBatchRunId String? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + resumedAt DateTime? + + @@index([dependentAttemptId]) + @@index([dependentBatchRunId]) +} + +model TaskRunAttempt { + id String @id @default(cuid()) + number Int @default(0) + + friendlyId String @unique + + /// GROUP (B) KEEP: the owning run (co-resident) + taskRun TaskRun @relation("attempts", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRunId String + + // scalarized control-plane FK (was `backgroundWorker @relation`) + backgroundWorkerId String + + // scalarized control-plane FK (was `backgroundWorkerTask @relation`) + backgroundWorkerTaskId String + + // scalarized control-plane FK (was `runtimeEnvironment @relation`) + runtimeEnvironmentId String + + // scalarized control-plane FK (was `queue @relation`) + queueId String + + status TaskRunAttemptStatus @default(PENDING) + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + startedAt DateTime? + completedAt DateTime? + + usageDurationMs Int @default(0) + + error Json? + output String? + outputType String @default("application/json") + + dependencies TaskRunDependency[] + batchDependencies BatchTaskRun[] + + checkpoints Checkpoint[] + batchTaskRunItems BatchTaskRunItem[] + CheckpointRestoreEvent CheckpointRestoreEvent[] + childRuns TaskRun[] @relation("TaskParentRunAttempt") + + @@unique([taskRunId, number]) + @@index([taskRunId]) +} + +enum TaskRunAttemptStatus { + /// NON-FINAL + PENDING + EXECUTING + PAUSED + /// FINAL + FAILED + CANCELED + COMPLETED +} + +model BatchTaskRun { + id String @id @default(cuid()) + friendlyId String @unique + idempotencyKey String? + idempotencyKeyExpiresAt DateTime? + status BatchTaskRunStatus @default(PENDING) + runtimeEnvironmentId String + /// This only includes new runs, not idempotent runs. + runs TaskRun[] + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + // new columns + /// Friendly IDs + runIds String[] @default([]) + runCount Int @default(0) + payload String? + payloadType String @default("application/json") + options Json? + batchVersion String @default("v1") + + //engine v2 + /// GROUP (A) FK-FREE: specific run blockers. Was `runsBlocked TaskRunWaitpoint[]`; + /// block edges are the rows of `TaskRunWaitpoint` keyed by the scalar `batchId`. + + /// GROUP (A) FK-FREE: waitpoints that are blocked by this batch. + /// Was `waitpoints Waitpoint[]`; the completion back-ref lives on + /// `Waitpoint.completedByBatchId` (scalar). + + // This is for v3 batches + /// sealed is set to true once no more items can be added to the batch + sealed Boolean @default(false) + sealedAt DateTime? + /// this is the expected number of items in the batch + expectedCount Int @default(0) + /// this is the completed number of items in the batch. once this reaches expectedCount, and the batch is sealed, the batch is considered completed + completedCount Int @default(0) + completedAt DateTime? + resumedAt DateTime? + + /// this is used to be able to "seal" this BatchTaskRun when all of the runs have been triggered asynchronously, and using the "parallel" processing strategy + processingJobsCount Int @default(0) + processingJobsExpectedCount Int @default(0) + + /// optional token that can be used to authenticate the task run + oneTimeUseToken String? + + // Run Engine v2 batch queue fields + /// When processing started (status changed to PROCESSING) + processingStartedAt DateTime? + /// When processing completed (all items processed) + processingCompletedAt DateTime? + /// Count of successfully created runs + successfulRunCount Int? + /// Count of failed run creations + failedRunCount Int? + /// Detailed failure records + errors BatchTaskRunError[] + + ///all the below properties are engine v1 only + items BatchTaskRunItem[] + taskIdentifier String? + /// GROUP (B) KEEP + checkpointEvent CheckpointRestoreEvent? @relation(fields: [checkpointEventId], references: [id], onDelete: Cascade, onUpdate: Cascade) + checkpointEventId String? @unique + /// GROUP (B) KEEP + dependentTaskAttempt TaskRunAttempt? @relation(fields: [dependentTaskAttemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) + dependentTaskAttemptId String? + runDependencies TaskRunDependency[] @relation("dependentBatchRun") + + @@unique([oneTimeUseToken]) + ///this is used for all engine versions + @@unique([runtimeEnvironmentId, idempotencyKey]) + @@index([dependentTaskAttemptId]) + // This is for the batch list dashboard page + @@index([runtimeEnvironmentId, id(sort: Desc)]) +} + +enum BatchTaskRunStatus { + PENDING + PROCESSING + COMPLETED + PARTIAL_FAILED + ABORTED +} + +///Used in engine V1 only +model BatchTaskRunItem { + id String @id @default(cuid()) + + status BatchTaskRunItemStatus @default(PENDING) + + /// GROUP (B) KEEP + batchTaskRun BatchTaskRun @relation(fields: [batchTaskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + batchTaskRunId String + + /// GROUP (B) KEEP + taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRunId String + + /// GROUP (B) KEEP + taskRunAttempt TaskRunAttempt? @relation(fields: [taskRunAttemptId], references: [id], onDelete: SetNull, onUpdate: Cascade) + taskRunAttemptId String? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + completedAt DateTime? + + @@unique([batchTaskRunId, taskRunId]) + @@index([taskRunAttemptId], map: "idx_batchtaskrunitem_taskrunattempt") + @@index([taskRunId], map: "idx_batchtaskrunitem_taskrun") +} + +enum BatchTaskRunItemStatus { + PENDING + FAILED + CANCELED + COMPLETED +} + +/// Track individual run creation failures in batch processing (Run Engine v2) +model BatchTaskRunError { + id String @id @default(cuid()) + /// GROUP (B) KEEP + batchTaskRun BatchTaskRun @relation(fields: [batchTaskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + batchTaskRunId String + + /// Which item in the batch (0-based index) + index Int + /// The task identifier that was being triggered + taskIdentifier String + /// The payload that failed (JSON, may be truncated) + payload String? + /// The options that were used + options Json? + /// Error message + error String + /// Error code if available + errorCode String? + + createdAt DateTime @default(now()) + + @@unique([batchTaskRunId, index]) + @@index([batchTaskRunId]) +} + +model Checkpoint { + id String @id @default(cuid()) + + friendlyId String @unique + + type CheckpointType + location String + imageRef String + reason String? + metadata String? + + events CheckpointRestoreEvent[] + + /// GROUP (B) KEEP + run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade) + runId String + + /// GROUP (B) KEEP + attempt TaskRunAttempt @relation(fields: [attemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) + attemptId String + attemptNumber Int? + + // scalarized control-plane FK (was `project @relation`) + projectId String + + // scalarized control-plane FK (was `runtimeEnvironment @relation`) + runtimeEnvironmentId String + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([attemptId]) + @@index([runId]) +} + +enum CheckpointType { + DOCKER + KUBERNETES +} + +model CheckpointRestoreEvent { + id String @id @default(cuid()) + + type CheckpointRestoreEventType + reason String? + metadata String? + + /// GROUP (B) KEEP + checkpoint Checkpoint @relation(fields: [checkpointId], references: [id], onDelete: Cascade, onUpdate: Cascade) + checkpointId String + + /// GROUP (B) KEEP + run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade) + runId String + + /// GROUP (B) KEEP + attempt TaskRunAttempt @relation(fields: [attemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) + attemptId String + + // scalarized control-plane FK (was `project @relation`) + projectId String + + // scalarized control-plane FK (was `runtimeEnvironment @relation`) + runtimeEnvironmentId String + + taskRunDependency TaskRunDependency? + batchTaskRunDependency BatchTaskRun? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([checkpointId]) + @@index([runId]) +} + +enum CheckpointRestoreEventType { + CHECKPOINT + RESTORE +} diff --git a/internal-packages/run-ops-database/src/index.ts b/internal-packages/run-ops-database/src/index.ts new file mode 100644 index 00000000000..ca13d150430 --- /dev/null +++ b/internal-packages/run-ops-database/src/index.ts @@ -0,0 +1,4 @@ +// The generated client class is named PrismaClient in BOTH packages; re-export it +// here under a distinct nominal alias so a consumer can import RunOpsPrismaClient +// and @trigger.dev/database's PrismaClient in the same module without a clash. +export { PrismaClient as RunOpsPrismaClient } from "../generated/run-ops/index.js"; diff --git a/internal-packages/run-ops-database/tsconfig.json b/internal-packages/run-ops-database/tsconfig.json new file mode 100644 index 00000000000..fda3a5ed806 --- /dev/null +++ b/internal-packages/run-ops-database/tsconfig.json @@ -0,0 +1,19 @@ +{ + "compilerOptions": { + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "moduleResolution": "node", + "preserveWatchOutput": true, + "skipLibCheck": true, + "noEmit": true, + "strict": true + }, + "exclude": [ + "node_modules", + "dist", + "generated", + "**/*.test.ts", + "vitest.config.ts" + ] +} diff --git a/internal-packages/run-ops-database/vitest.config.ts b/internal-packages/run-ops-database/vitest.config.ts new file mode 100644 index 00000000000..e07f05e842b --- /dev/null +++ b/internal-packages/run-ops-database/vitest.config.ts @@ -0,0 +1,10 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["**/*.test.ts"], + globals: true, + isolate: true, + testTimeout: 10_000, + }, +}); diff --git a/internal-packages/testcontainers/package.json b/internal-packages/testcontainers/package.json index b3ab7ce5dc4..291c1314c18 100644 --- a/internal-packages/testcontainers/package.json +++ b/internal-packages/testcontainers/package.json @@ -19,6 +19,7 @@ "ioredis": "~5.6.0" }, "devDependencies": { + "@internal/run-ops-database": "workspace:*", "@testcontainers/postgresql": "^11.14.0", "@testcontainers/redis": "^11.14.0", "@trigger.dev/core": "workspace:*", diff --git a/internal-packages/testcontainers/src/heteroDedicated.test.ts b/internal-packages/testcontainers/src/heteroDedicated.test.ts new file mode 100644 index 00000000000..87f247c5eef --- /dev/null +++ b/internal-packages/testcontainers/src/heteroDedicated.test.ts @@ -0,0 +1,26 @@ +import { expect } from "vitest"; +import { heteroRunOpsPostgresTest } from "./index.js"; + +// The dedicated subset (NEW/PG17) has run-ops tables (TaskRun) but NOT control-plane tables +// (Organization); the legacy side (PG14) keeps the full control-plane schema. +heteroRunOpsPostgresTest( + "NEW (PG17) side has run-ops tables but NOT control-plane tables", + async ({ prisma14, prisma17 }) => { + // Cast regclass -> text: Prisma can't deserialize a bare `regclass` column. + const regclass = async ( + p: { $queryRawUnsafe: (q: string) => Promise }, + table: string + ): Promise => { + const rows = (await p.$queryRawUnsafe( + `SELECT to_regclass('"${table}"')::text AS t` + )) as Array<{ t: string | null }>; + return rows[0]?.t ?? null; + }; + + expect(await regclass(prisma14, "Organization")).not.toBeNull(); + expect(await regclass(prisma17, "TaskRun")).not.toBeNull(); + expect(await regclass(prisma17, "Organization")).toBeNull(); + }, + // Booting two PG containers and pushing two schemas on first run far exceeds vitest's 5s default. + 120_000 +); diff --git a/internal-packages/testcontainers/src/index.ts b/internal-packages/testcontainers/src/index.ts index b8071261afe..365d6605afa 100644 --- a/internal-packages/testcontainers/src/index.ts +++ b/internal-packages/testcontainers/src/index.ts @@ -2,6 +2,7 @@ import { type ClickHouseClient, createClient } from "@clickhouse/client"; import { type StartedPostgreSqlContainer, PostgreSqlContainer } from "@testcontainers/postgresql"; import type { StartedRedisContainer } from "@testcontainers/redis"; import { PrismaClient } from "@trigger.dev/database"; +import { RunOpsPrismaClient } from "@internal/run-ops-database"; import Redis, { type RedisOptions } from "ioredis"; import path from "path"; import { type StartedNetwork, Network } from "testcontainers"; @@ -16,11 +17,11 @@ import { getTaskMetadata, logCleanup, logSetup } from "./logs"; import { type MinIOConnectionConfig, type StartedMinIOContainer, MinIOContainer } from "./minio"; import { createClickHouseContainer, - createElectricContainer, createPostgresContainer, createRedisContainer, postgresUriWithDatabase, pushDatabaseSchema, + pushRunOpsSchema, useContainer, withCiResourceLimits, withContainerSetup, @@ -42,14 +43,8 @@ type RedisContext = NetworkContext & { redisOptions: RedisOptions; }; -type ElectricContext = { - electricOrigin: string; -}; - export type ContainerContext = NetworkContext & PostgresContext & RedisContext & ClickhouseContext; export type PostgresAndRedisContext = NetworkContext & PostgresContext & RedisContext; -export type ContainerWithElectricAndRedisContext = ContainerContext & ElectricContext; -export type ContainerWithElectricContext = NetworkContext & PostgresContext & ElectricContext; export type { StartedClickHouseContainer, @@ -82,7 +77,6 @@ export const network = async ({ task }: TestContext, use: Use) = try { await use(network); } finally { - // Make sure to stop the network after use await logCleanup("network", network.stop(), metadata); } }; @@ -167,6 +161,70 @@ const getWorkerPostgresContainer = () => { return workerPostgresContainer; }; +// --- Heterogeneous PG14 + PG17 fixture --- +// `und-x-icu` is a predefined ICU collation available by default in BOTH PG14 and PG17, so it is the +// version-symmetric per-column source of truth for cross-version sort equality. +export const HETERO_PINNED_ICU_COLLATION = "und-x-icu"; + +// PG17 worker singleton mirroring getWorkerPostgresContainer. PG17 supports the ICU cluster locale +// provider (PG14 does not - it arrived in PG15), so only this side sets the cluster locale; the real +// cross-version guarantee is the per-column COLLATE in the proof test. +let workerPostgresContainer17: Promise | undefined; +const getWorkerPostgresContainer17 = () => { + if (!workerPostgresContainer17) { + workerPostgresContainer17 = (async () => { + const container = await withCiResourceLimits(new PostgreSqlContainer("docker.io/postgres:17")) + .withCommand(["-c", "listen_addresses=*", "-c", "wal_level=logical"]) + .withEnvironment({ + POSTGRES_INITDB_ARGS: "--locale-provider=icu --icu-locale=en-US --encoding=UTF8", + }) + .start(); + const admin = new PrismaClient({ + datasources: { + db: { url: postgresUriWithDatabase(container.getConnectionUri(), "postgres") }, + }, + }); + await admin.$executeRawUnsafe(`CREATE DATABASE "${POSTGRES_TEMPLATE_DB}"`); + await admin.$disconnect(); + await pushDatabaseSchema( + postgresUriWithDatabase(container.getConnectionUri(), POSTGRES_TEMPLATE_DB) + ); + return container; + })(); + } + return workerPostgresContainer17; +}; + +// PG17 worker singleton for the DEDICATED run-ops fixture. This is a SEPARATE container from +// getWorkerPostgresContainer17 on purpose: that one's template db has the FULL @trigger.dev/database +// schema pushed (consumed by heteroPostgresTest), whereas this one pushes the dedicated run-ops +// SUBSET schema. Keeping them apart avoids a schema collision in the shared template db. +let runOpsWorkerPostgresContainer17: Promise | undefined; +const getRunOpsWorkerPostgresContainer17 = () => { + if (!runOpsWorkerPostgresContainer17) { + runOpsWorkerPostgresContainer17 = (async () => { + const container = await withCiResourceLimits(new PostgreSqlContainer("docker.io/postgres:17")) + .withCommand(["-c", "listen_addresses=*", "-c", "wal_level=logical"]) + .withEnvironment({ + POSTGRES_INITDB_ARGS: "--locale-provider=icu --icu-locale=en-US --encoding=UTF8", + }) + .start(); + const admin = new PrismaClient({ + datasources: { + db: { url: postgresUriWithDatabase(container.getConnectionUri(), "postgres") }, + }, + }); + await admin.$executeRawUnsafe(`CREATE DATABASE "${POSTGRES_TEMPLATE_DB}"`); + await admin.$disconnect(); + await pushRunOpsSchema( + postgresUriWithDatabase(container.getConnectionUri(), POSTGRES_TEMPLATE_DB) + ); + return container; + })(); + } + return runOpsWorkerPostgresContainer17; +}; + // Per test: clone a fresh database from the template (fast filesystem copy), then hand back a view // of the shared container whose connection points at the clone. This keeps prisma AND any code that // reads postgresContainer.getConnectionUri()/getDatabase() (e.g. logical replication) on the SAME @@ -264,6 +322,129 @@ export const postgresTest = test.extend({ prisma: prismaFromContainer, }); +type HeteroPostgresTestContext = { + // PG14 (legacy / control-plane DB analog) + postgresContainer14: StartedPostgreSqlContainer; + prisma14: PrismaClient; + uri14: string; + // PG17 (new / dedicated run-ops DB analog) + postgresContainer17: StartedPostgreSqlContainer; + prisma17: PrismaClient; + uri17: string; + pinnedCollation: string; // === HETERO_PINNED_ICU_COLLATION +}; + +// Hands a test two prisma clients + two connection URIs (one PG14, one PG17) over the same migrated +// schema, each on a fresh per-test clone of its version's template. Consumed only by explicit +// cross-version test files - never wired into a product fixture. +export const heteroPostgresTest = test.extend({ + postgresContainer14: async ({}, use) => { + await use(await getWorkerPostgresContainer()); + }, + postgresContainer17: async ({}, use) => { + await use(await getWorkerPostgresContainer17()); + }, + uri14: async ({ postgresContainer14 }, use) => { + const baseUri = postgresContainer14.getConnectionUri(); + const cloneDb = `hetero14_${pgCloneCounter++}`; + await createDatabaseFromTemplate(baseUri, cloneDb); + try { + await use(postgresUriWithDatabase(baseUri, cloneDb)); + } finally { + await dropCloneDatabase(baseUri, cloneDb); + } + }, + uri17: async ({ postgresContainer17 }, use) => { + const baseUri = postgresContainer17.getConnectionUri(); + const cloneDb = `hetero17_${pgCloneCounter++}`; + await createDatabaseFromTemplate(baseUri, cloneDb); + try { + await use(postgresUriWithDatabase(baseUri, cloneDb)); + } finally { + await dropCloneDatabase(baseUri, cloneDb); + } + }, + prisma14: async ({ uri14 }, use) => { + const prisma = new PrismaClient({ datasources: { db: { url: uri14 } } }); + try { + await use(prisma); + } finally { + await prisma.$disconnect(); + } + }, + prisma17: async ({ uri17 }, use) => { + const prisma = new PrismaClient({ datasources: { db: { url: uri17 } } }); + try { + await use(prisma); + } finally { + await prisma.$disconnect(); + } + }, + pinnedCollation: async ({}, use) => { + await use(HETERO_PINNED_ICU_COLLATION); + }, +}); + +type HeteroRunOpsPostgresTestContext = { + // PG14 (legacy / control-plane DB analog) — full @trigger.dev/database control-plane schema. + postgresContainer14: StartedPostgreSqlContainer; + prisma14: PrismaClient; + uri14: string; + // PG17 (new / dedicated run-ops DB) — the @internal/run-ops-database SUBSET schema. + postgresContainer17: StartedPostgreSqlContainer; + prisma17: RunOpsPrismaClient; + uri17: string; +}; + +// Additive sibling of heteroPostgresTest for the dedicated run-ops migration: prisma14 is the full +// control-plane schema on PG14 (legacy), prisma17 is a RunOpsPrismaClient over the dedicated SUBSET +// schema on a SEPARATE PG17 container. Lets a test prove the two sides carry different schemas +// without disturbing the existing heteroPostgresTest (which keeps the full schema on both sides). +export const heteroRunOpsPostgresTest = test.extend({ + postgresContainer14: async ({}, use) => { + await use(await getWorkerPostgresContainer()); + }, + postgresContainer17: async ({}, use) => { + await use(await getRunOpsWorkerPostgresContainer17()); + }, + uri14: async ({ postgresContainer14 }, use) => { + const baseUri = postgresContainer14.getConnectionUri(); + const cloneDb = `heteroRunOps14_${pgCloneCounter++}`; + await createDatabaseFromTemplate(baseUri, cloneDb); + try { + await use(postgresUriWithDatabase(baseUri, cloneDb)); + } finally { + await dropCloneDatabase(baseUri, cloneDb); + } + }, + uri17: async ({ postgresContainer17 }, use) => { + const baseUri = postgresContainer17.getConnectionUri(); + const cloneDb = `heteroRunOps17_${pgCloneCounter++}`; + await createDatabaseFromTemplate(baseUri, cloneDb); + try { + await use(postgresUriWithDatabase(baseUri, cloneDb)); + } finally { + await dropCloneDatabase(baseUri, cloneDb); + } + }, + prisma14: async ({ uri14 }, use) => { + const prisma = new PrismaClient({ datasources: { db: { url: uri14 } } }); + try { + await use(prisma); + } finally { + await prisma.$disconnect(); + } + }, + prisma17: async ({ uri17 }, use) => { + const prisma = new RunOpsPrismaClient({ datasources: { db: { url: uri17 } } }); + try { + await use(prisma); + } finally { + await prisma.$disconnect(); + } + }, +}); + export const redisContainer = async ( { network, task }: { network: StartedNetwork } & TestContext, use: Use @@ -582,24 +763,6 @@ export const replicationContainerTest = test.extend({ - network, - postgresContainer, - prisma, - electricOrigin, -}); - -export const containerWithElectricAndRedisTest = test.extend({ - network, - postgresContainer, - prisma, - redisContainer, - redisOptions, - electricOrigin, - clickhouseContainer, - clickhouseClient, -}); - // Boot minio once per worker; reset the bucket per test (auto fixture). const bootWorkerMinio = async ({}, use: Use) => { const container = await withCiResourceLimits(new MinIOContainer()).start(); diff --git a/internal-packages/testcontainers/src/utils.ts b/internal-packages/testcontainers/src/utils.ts index 21509208a62..bece1dab99a 100644 --- a/internal-packages/testcontainers/src/utils.ts +++ b/internal-packages/testcontainers/src/utils.ts @@ -3,6 +3,7 @@ import type { StartedPostgreSqlContainer } from "@testcontainers/postgresql"; import { PostgreSqlContainer } from "@testcontainers/postgresql"; import type { StartedRedisContainer } from "@testcontainers/redis"; import { RedisContainer } from "@testcontainers/redis"; +import { PrismaClient } from "@trigger.dev/database"; import { tryCatch } from "@trigger.dev/core"; import Redis from "ioredis"; import path from "path"; @@ -26,26 +27,74 @@ export function postgresUriWithDatabase(uri: string, database: string): string { export async function pushDatabaseSchema(databaseUrl: string) { const databasePath = path.resolve(__dirname, "../../database"); + return pushPrismaSchema({ + prismaBin: `${databasePath}/node_modules/.bin/prisma`, + schemaPath: `${databasePath}/prisma/schema.prisma`, + // The full @trigger.dev/database datasource reads DATABASE_URL/DIRECT_URL. + env: { DATABASE_URL: databaseUrl, DIRECT_URL: databaseUrl }, + }); +} + +/** + * Pushes the DEDICATED run-ops subset schema (@internal/run-ops-database) into the database at + * `databaseUrl`. The run-ops datasource reads RUN_OPS_DATABASE_URL/RUN_OPS_DATABASE_DIRECT_URL, and + * its schema is a subset of the control-plane schema (run-ops tables, no Organization/Project/etc). + */ +export async function pushRunOpsSchema(databaseUrl: string) { + // Resolve the schema (and the package's own prisma binary) through the @internal/run-ops-database + // package so this keeps working regardless of where pnpm hoists it. + const schemaPath = require.resolve("@internal/run-ops-database/prisma/schema.prisma"); + const runOpsPackagePath = path.resolve(schemaPath, "../.."); + + const result = await pushPrismaSchema({ + prismaBin: `${runOpsPackagePath}/node_modules/.bin/prisma`, + schemaPath, + env: { RUN_OPS_DATABASE_URL: databaseUrl, RUN_OPS_DATABASE_DIRECT_URL: databaseUrl }, + }); + + // `db push` derives DDL from the schema datamodel and so cannot create the SQL-only partial unique + // index Prisma can't express (the NULL-batchIndex dedup). Apply it here so the test DB matches what + // `migrate deploy` builds in production; otherwise ON CONFLICT DO NOTHING can't dedupe a re-blocked + // NULL-batchIndex edge and the idempotency contract goes untested. + await applyRunOpsSqlOnlyIndexes(databaseUrl); + + return result; +} + +async function applyRunOpsSqlOnlyIndexes(databaseUrl: string) { + // Reuse @trigger.dev/database's PrismaClient purely as a raw-SQL connection (the established + // primitive in this package — see createDatabaseFromTemplate). No model access, so the subset + // schema mismatch is irrelevant. + const client = new PrismaClient({ datasources: { db: { url: databaseUrl } } }); + try { + await client.$executeRawUnsafe( + `CREATE UNIQUE INDEX IF NOT EXISTS "TaskRunWaitpoint_taskRunId_waitpointId_batchIndex_null_key" ON "public"."TaskRunWaitpoint"("taskRunId", "waitpointId") WHERE "batchIndex" IS NULL` + ); + } finally { + await client.$disconnect(); + } +} + +async function pushPrismaSchema({ + prismaBin, + schemaPath, + env, +}: { + prismaBin: string; + schemaPath: string; + env: Record; +}) { // throwOnError is essential: without it tinyexec swallows a non-zero `prisma db push`, so a failed // push looks like success and only surfaces much later as a confusing downstream error. const result = await x( - `${databasePath}/node_modules/.bin/prisma`, - [ - "db", - "push", - "--force-reset", - "--accept-data-loss", - "--skip-generate", - "--schema", - `${databasePath}/prisma/schema.prisma`, - ], + prismaBin, + ["db", "push", "--force-reset", "--accept-data-loss", "--skip-generate", "--schema", schemaPath], { throwOnError: true, nodeOptions: { env: { ...process.env, - DATABASE_URL: databaseUrl, - DIRECT_URL: databaseUrl, + ...env, }, }, } @@ -83,12 +132,21 @@ function parsePositiveNumberEnv(name: string): number | undefined { return value; } -export async function createPostgresContainer(network: StartedNetwork) { - const container = await withCiResourceLimits(new PostgreSqlContainer("docker.io/postgres:14")) +export async function createPostgresContainer( + network: StartedNetwork, + opts?: { imageTag?: string; initdbArgs?: string } +) { + const imageTag = opts?.imageTag ?? "docker.io/postgres:14"; + let builder = withCiResourceLimits(new PostgreSqlContainer(imageTag)) .withNetwork(network) .withNetworkAliases("database") - .withCommand(["-c", "listen_addresses=*", "-c", "wal_level=logical"]) - .start(); + .withCommand(["-c", "listen_addresses=*", "-c", "wal_level=logical"]); + if (opts?.initdbArgs) { + // POSTGRES_INITDB_ARGS is the official postgres image hook for initdb flags + // (locale/collation provider). No dedicated PostgreSqlContainer method exists. + builder = builder.withEnvironment({ POSTGRES_INITDB_ARGS: opts.initdbArgs }); + } + const container = await builder.start(); await pushDatabaseSchema(container.getConnectionUri()); @@ -106,7 +164,6 @@ export async function createClickHouseContainer(network: StartedNetwork) { await client.ping(); - // Now we run the migrations const migrationsPath = path.resolve(__dirname, "../../clickhouse/schema"); await runClickhouseMigrations(client, migrationsPath); @@ -140,7 +197,6 @@ export async function createRedisContainer({ .withWaitStrategy(Wait.forLogMessage("Ready to accept connections")) .start(); - // Add a verification step const [error] = await tryCatch(verifyRedisConnection(startedContainer)); if (error) { @@ -193,33 +249,6 @@ async function verifyRedisConnection(container: StartedRedisContainer) { } } -export async function createElectricContainer( - postgresContainer: StartedPostgreSqlContainer, - network: StartedNetwork -) { - const databaseUrl = `postgresql://${postgresContainer.getUsername()}:${postgresContainer.getPassword()}@${postgresContainer.getIpAddress( - network.getName() - )}:5432/${postgresContainer.getDatabase()}?sslmode=disable`; - - const container = await withCiResourceLimits( - new GenericContainer( - "electricsql/electric:1.2.4@sha256:20da3d0b0e74926c5623392db67fd56698b9e374c4aeb6cb5cadeb8fea171c36" - ) - ) - .withExposedPorts(3000) - .withNetwork(network) - .withEnvironment({ - DATABASE_URL: databaseUrl, - ELECTRIC_INSECURE: "true", - }) - .start(); - - return { - container, - origin: `http://${container.getHost()}:${container.getMappedPort(3000)}`, - }; -} - export async function createMinIOContainer(network: StartedNetwork) { const container = await withCiResourceLimits(new MinIOContainer()) .withNetwork(network) diff --git a/package.json b/package.json index e3e134812a7..185d97cc7c6 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "build:force": "turbo run build --force", "build:db:seed": "turbo run build:db:seed", "db:migrate": "turbo run db:migrate:deploy generate", + "db:migrate:run-ops": "pnpm --filter @trigger.dev/database run db:migrate:run-ops", "db:seed": "turbo run db:seed", "db:studio": "turbo run db:studio", "db:populate": "turbo run db:populate", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9268e8f396e..d79a7e880cb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1379,6 +1379,22 @@ importers: specifier: 6.0.1 version: 6.0.1 + internal-packages/run-ops-database: + dependencies: + '@prisma/client': + specifier: 6.14.0 + version: 6.14.0(prisma@6.14.0(magicast@0.3.5)(typescript@5.5.4))(typescript@5.5.4) + prisma: + specifier: 6.14.0 + version: 6.14.0(magicast@0.3.5)(typescript@5.5.4) + devDependencies: + rimraf: + specifier: 6.0.1 + version: 6.0.1 + vitest: + specifier: 4.1.7 + version: 4.1.7(@opentelemetry/api@1.9.1)(@types/node@20.14.14)(@vitest/coverage-v8@4.1.7)(vite@6.4.2(@types/node@20.14.14)(jiti@2.6.1)(lightningcss@1.29.2)(terser@5.46.1)(tsx@4.22.4)(yaml@2.9.0)) + internal-packages/run-store: dependencies: '@trigger.dev/core': @@ -1488,6 +1504,9 @@ importers: specifier: ~5.6.0 version: 5.6.1 devDependencies: + '@internal/run-ops-database': + specifier: workspace:* + version: link:../run-ops-database '@testcontainers/postgresql': specifier: ^11.14.0 version: 11.14.0 @@ -3043,8 +3062,12 @@ packages: resolution: {integrity: sha512-sIyFcoPZkTtNu9xFeEoynMef3bPJIAbOfUh+ueYcfhVl6xm2VRtMcMclSxmZCMnHHd4hlYKJeq/aggmBEWynww==} engines: {node: '>=18.0.0'} - '@babel/code-frame@7.29.7': - resolution: {integrity: sha512-Aup7aUOfpbAUg2ROOJN6Iw5f9DMBlzu0mIkm/malLQFN/YQgO48wCj0Kxa3sEHJvPVFg7siR+qRInwXd2qhQKw==} + '@babel/code-frame@7.24.7': + resolution: {integrity: sha512-BcYH1CVJBO9tvyIZ2jVeXgSIMvGZ2FDRvDdOIVQyuklNKSsx+eppDEBq/g47Ayw+RqNFE+URvOShmf+f/qwAlA==} + engines: {node: '>=6.9.0'} + + '@babel/code-frame@7.29.0': + resolution: {integrity: sha512-9NhCeYjq9+3uxgdtp20LSiJXJvN0FeCtNGpJxuMFZ1Kv3cWUNb6DOhJwUvcVCzKGR66cw4njwM6hrJLqgOwbcw==} engines: {node: '>=6.9.0'} '@babel/compat-data@7.22.9': @@ -3055,8 +3078,12 @@ packages: resolution: {integrity: sha512-2EENLmhpwplDux5PSsZnSbnSkB3tZ6QTksgO25xwEL7pIDcNOMhF5v/s6RzwjMZzZzw9Ofc30gHv5ChCC8pifQ==} engines: {node: '>=6.9.0'} - '@babel/generator@7.29.7': - resolution: {integrity: sha512-DkXD5OJQaAQIdZ1bt3UZdEnHAn9Imd3IVBdX03UFe+ony9Ojw5pzr9YVKGDY1jt+Gcn/FnGkNf8r+Vj5NOJWtQ==} + '@babel/generator@7.24.7': + resolution: {integrity: sha512-oipXieGC3i45Y1A41t4tAqpnEZWgB/lC6Ehh6+rOviR5XWpTtMmLN+fGjz9vOiNRt0p6RtO6DtD0pdU3vpqdSA==} + engines: {node: '>=6.9.0'} + + '@babel/generator@7.29.1': + resolution: {integrity: sha512-qsaF+9Qcm2Qv8SRIMMscAvG4O3lJ0F1GuMo5HR/Bp02LopNgnZBC/EkbevHFeGs4ls/oPz9v+Bsmzbkbe+0dUw==} engines: {node: '>=6.9.0'} '@babel/helper-annotate-as-pure@7.22.5': @@ -3137,6 +3164,10 @@ packages: resolution: {integrity: sha512-7pAjK0aSdxOwR+CcYAqgWOGy5dcfvzsTIfFTb2odQqW47MDfv14UaJDY6eng8ylM2EaeKXdxaSWESbkmaQHTmw==} engines: {node: '>=6.9.0'} + '@babel/highlight@7.24.7': + resolution: {integrity: sha512-EStJpq4OuY8xYfhGVXngigBJRWxftKX9ksiGDnmlY3o7B/V7KIAc9X4oiK87uPJSc/vs5L869bem5fhZa8caZw==} + engines: {node: '>=6.9.0'} + '@babel/parser@7.24.7': resolution: {integrity: sha512-9uUYRm6OqQrCqQdG1iCBwBPZgN8ciDBro2nIOFaiRz1/BCxaI7CNvQbDHvsArAC7Tw9Hda/B3U+6ui9u4HWXPw==} engines: {node: '>=6.0.0'} @@ -3212,8 +3243,12 @@ packages: resolution: {integrity: sha512-Q/N6JNWvIvPnLDvjlE1OUBLPQHH6l3CltCEsHIujp45zQUSSh8K+gHnaEX45yAT1nyngnINhvWtzN+Nb9D8RAQ==} engines: {node: '>=6.9.0'} - '@babel/template@7.29.7': - resolution: {integrity: sha512-puq+Gf35oI24FeN11LkoUQFqv9uwNeWpxXZi/Ji3rRIoKAzKnxRaZ+Gkj0vKS9ZCiTESfng1N9LyOyXvo+m+Gg==} + '@babel/template@7.24.7': + resolution: {integrity: sha512-jYqfPrU9JTF0PmPy1tLYHW4Mp4KlgxJD9l2nP9fD6yT/ICi554DmrWBAEYpIelzjHf1msDP3PxJIRt/nFNfBig==} + engines: {node: '>=6.9.0'} + + '@babel/template@7.28.6': + resolution: {integrity: sha512-YA6Ma2KsCdGb+WC6UpBVFJGXL58MDA6oyONbjyF/+5sBgxY/dwkhLogbMT2GXXyU84/IhRw/2D1Os1B/giz+BQ==} engines: {node: '>=6.9.0'} '@babel/traverse@7.27.0': @@ -11371,14 +11406,6 @@ packages: picomatch: optional: true - fdir@6.4.4: - resolution: {integrity: sha512-1NZP+GK4GfuAv3PqKvxQRDMjdSRZjnkq7KfhlNrCNNlZ0ygQFpebfrnfnq/W7fpUnAv9aGWmY1zKx7FYL3gwhg==} - peerDependencies: - picomatch: ^4.0.4 - peerDependenciesMeta: - picomatch: - optional: true - fdir@6.5.0: resolution: {integrity: sha512-tIbYtZbucOs0BRGqPJkshJUYdL+SDH7dVM8gjy+ERp3WAUjLEFJE+02kanyHtwjWOnwrKYBiwAmM0p4kLJAnXg==} engines: {node: '>=12.0.0'} @@ -11669,6 +11696,7 @@ packages: glob@11.1.0: resolution: {integrity: sha512-vuNwKSaKiqm7g0THUBu2x7ckSs3XJLXE+2ssL7/MfTGPLLcrJQ/4Uq1CjPTtO5cCIiRxqvN6Twy1qOwhL0Xjcw==} engines: {node: 20 || >=22} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me hasBin: true glob@13.0.6: @@ -12382,6 +12410,11 @@ packages: resolution: {integrity: sha512-B7qPcEVE3NVkmSJbaYxvv4cHkVW7DQsZz13pUMrfS8z8Q/BuShN+gcTXrUlPiGqM2/t/EEaI030bpxMqY8gMlw==} engines: {node: '>= 10.16.0'} + jsesc@2.5.2: + resolution: {integrity: sha512-OYu7XEzjkCQ3C5Ps3QIZsQfNpqoJyZZA99wd9aWd05NCtC5pWOkShK2mkL6HXQR6/Cy2lbNdPlZBpuQHXE63gA==} + engines: {node: '>=4'} + hasBin: true + jsesc@3.0.2: resolution: {integrity: sha512-xKqzzWXDttJuOcawBt4KnKHHIf5oQ/Cxax+0PWFG+DFDgHNAdi+TXECADI+RYiFUMmx8792xsMbbgXj4CwnP4g==} engines: {node: '>=6'} @@ -18993,7 +19026,12 @@ snapshots: '@aws/lambda-invoke-store@0.2.1': {} - '@babel/code-frame@7.29.7': + '@babel/code-frame@7.24.7': + dependencies: + '@babel/highlight': 7.24.7 + picocolors: 1.1.1 + + '@babel/code-frame@7.29.0': dependencies: '@babel/helper-validator-identifier': 7.29.7 js-tokens: 4.0.0 @@ -19004,13 +19042,13 @@ snapshots: '@babel/core@7.22.17': dependencies: '@ampproject/remapping': 2.3.0 - '@babel/code-frame': 7.29.7 - '@babel/generator': 7.29.7 + '@babel/code-frame': 7.24.7 + '@babel/generator': 7.24.7 '@babel/helper-compilation-targets': 7.22.15 '@babel/helper-module-transforms': 7.22.17(@babel/core@7.22.17) '@babel/helpers': 7.22.15 '@babel/parser': 7.29.7 - '@babel/template': 7.29.7 + '@babel/template': 7.24.7 '@babel/traverse': 7.27.0 '@babel/types': 7.29.7 convert-source-map: 1.9.0 @@ -19021,7 +19059,14 @@ snapshots: transitivePeerDependencies: - supports-color - '@babel/generator@7.29.7': + '@babel/generator@7.24.7': + dependencies: + '@babel/types': 7.29.7 + '@jridgewell/gen-mapping': 0.3.13 + '@jridgewell/trace-mapping': 0.3.31 + jsesc: 2.5.2 + + '@babel/generator@7.29.1': dependencies: '@babel/parser': 7.29.7 '@babel/types': 7.29.7 @@ -19060,7 +19105,7 @@ snapshots: '@babel/helper-function-name@7.24.7': dependencies: - '@babel/template': 7.29.7 + '@babel/template': 7.24.7 '@babel/types': 7.29.7 '@babel/helper-member-expression-to-functions@7.23.0': @@ -19113,12 +19158,19 @@ snapshots: '@babel/helpers@7.22.15': dependencies: - '@babel/template': 7.29.7 + '@babel/template': 7.24.7 '@babel/traverse': 7.27.0 '@babel/types': 7.29.7 transitivePeerDependencies: - supports-color + '@babel/highlight@7.24.7': + dependencies: + '@babel/helper-validator-identifier': 7.29.7 + chalk: 2.4.2 + js-tokens: 4.0.0 + picocolors: 1.1.1 + '@babel/parser@7.24.7': dependencies: '@babel/types': 7.29.7 @@ -19190,18 +19242,24 @@ snapshots: '@babel/runtime@7.28.4': {} - '@babel/template@7.29.7': + '@babel/template@7.24.7': + dependencies: + '@babel/code-frame': 7.24.7 + '@babel/parser': 7.29.7 + '@babel/types': 7.29.7 + + '@babel/template@7.28.6': dependencies: - '@babel/code-frame': 7.29.7 + '@babel/code-frame': 7.29.0 '@babel/parser': 7.29.7 '@babel/types': 7.29.7 '@babel/traverse@7.27.0': dependencies: - '@babel/code-frame': 7.29.7 - '@babel/generator': 7.29.7 + '@babel/code-frame': 7.29.0 + '@babel/generator': 7.29.1 '@babel/parser': 7.29.7 - '@babel/template': 7.29.7 + '@babel/template': 7.28.6 '@babel/types': 7.29.7 debug: 4.4.3(supports-color@10.0.0) globals: 11.12.0 @@ -23490,7 +23548,7 @@ snapshots: '@remix-run/dev@2.17.5(@remix-run/react@2.17.5(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(typescript@5.5.4))(@remix-run/serve@2.17.5(typescript@5.5.4))(@types/node@22.20.0)(bufferutil@4.0.9)(jiti@2.6.1)(lightningcss@1.29.2)(terser@5.46.1)(tsx@4.20.6)(typescript@5.5.4)(vite@6.4.2(@types/node@22.20.0)(jiti@2.6.1)(lightningcss@1.29.2)(terser@5.46.1)(tsx@4.20.6)(yaml@2.9.0))(yaml@2.9.0)': dependencies: '@babel/core': 7.22.17 - '@babel/generator': 7.29.7 + '@babel/generator': 7.24.7 '@babel/parser': 7.29.7 '@babel/plugin-syntax-decorators': 7.22.10(@babel/core@7.22.17) '@babel/plugin-syntax-jsx': 7.22.5(@babel/core@7.22.17) @@ -28601,10 +28659,6 @@ snapshots: optionalDependencies: picomatch: 4.0.4 - fdir@6.4.4(picomatch@4.0.4): - optionalDependencies: - picomatch: 4.0.4 - fdir@6.5.0(picomatch@4.0.4): optionalDependencies: picomatch: 4.0.4 @@ -29162,7 +29216,7 @@ snapshots: hast-util-to-jsx-runtime@2.3.6: dependencies: - '@types/estree': 1.0.8 + '@types/estree': 1.0.9 '@types/hast': 3.0.4 '@types/unist': 3.0.3 comma-separated-tokens: 2.0.3 @@ -29697,6 +29751,8 @@ snapshots: jsep@1.4.0: {} + jsesc@2.5.2: {} + jsesc@3.0.2: {} json-buffer@3.0.0: {} @@ -31551,7 +31607,7 @@ snapshots: parse-json@5.2.0: dependencies: - '@babel/code-frame': 7.29.7 + '@babel/code-frame': 7.24.7 error-ex: 1.3.2 json-parse-even-better-errors: 2.3.1 lines-and-columns: 1.2.4 @@ -33987,7 +34043,7 @@ snapshots: tinyglobby@0.2.13: dependencies: - fdir: 6.4.4(picomatch@4.0.4) + fdir: 6.5.0(picomatch@4.0.4) picomatch: 4.0.4 tinyglobby@0.2.16: From a6a955ae4636b3bd2f0befea523cd739c9047c56 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 1 Jul 2026 18:47:01 +0100 Subject: [PATCH 2/8] restore Electric testcontainer fixtures removed in the run-ops db foundation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The db-foundation change dropped createElectricContainer, the electricOrigin fixture, the ContainerWithElectric* context types, and the containerWithElectricTest / containerWithElectricAndRedisTest fixtures. That Electric infra is unrelated to the run-ops db split, is being sunset on its own track, and is still consumed by apps/webapp/test/realtimeClient.test.ts (which imports containerWithElectricAndRedisTest) — so its removal here also broke that import. Restore it verbatim; the new PG17 run-ops fixtures are unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../database/scripts/migrate-run-ops.mjs | 62 ------------------- internal-packages/testcontainers/src/index.ts | 25 ++++++++ internal-packages/testcontainers/src/utils.ts | 27 ++++++++ 3 files changed, 52 insertions(+), 62 deletions(-) delete mode 100644 internal-packages/database/scripts/migrate-run-ops.mjs diff --git a/internal-packages/database/scripts/migrate-run-ops.mjs b/internal-packages/database/scripts/migrate-run-ops.mjs deleted file mode 100644 index 7ec3dd996ed..00000000000 --- a/internal-packages/database/scripts/migrate-run-ops.mjs +++ /dev/null @@ -1,62 +0,0 @@ -// Apply Prisma migrations to the RUN-OPS database — the second physical DB in the run-ops -// split. The standard `db:migrate` only targets DATABASE_URL (the control-plane DB), so the -// run-ops DB must be migrated explicitly or its schema drifts (e.g. cross-DB FKs that were -// dropped on the control-plane DB linger on the run-ops DB and break inserts). -// -// The run-ops connection comes from TASK_RUN_DATABASE_URL / TASK_RUN_DATABASE_DIRECT_URL -// (set directly in deploy environments; read from the local .env otherwise). We then run -// `prisma migrate deploy` with DATABASE_URL/DIRECT_URL pointed at it. -import { spawnSync } from "node:child_process"; -import { readFileSync } from "node:fs"; -import { dirname, resolve } from "node:path"; -import { fileURLToPath } from "node:url"; - -const dbPackageRoot = resolve(dirname(fileURLToPath(import.meta.url)), ".."); - -function readFromEnvFiles(key) { - for (const file of [resolve(dbPackageRoot, ".env"), resolve(dbPackageRoot, "../../.env")]) { - let contents; - try { - contents = readFileSync(file, "utf8"); - } catch { - continue; - } - for (const line of contents.split("\n")) { - const match = line.match(/^\s*([A-Z0-9_]+)\s*=\s*(.*?)\s*$/); - if (!match || match[1] !== key) continue; - let value = match[2]; - if ( - (value.startsWith('"') && value.endsWith('"')) || - (value.startsWith("'") && value.endsWith("'")) - ) { - value = value.slice(1, -1); - } - if (value) return value; - } - } - return undefined; -} - -const resolveVar = (key) => process.env[key] || readFromEnvFiles(key); -const redact = (url) => url.replace(/:\/\/[^@]*@/, "://***@"); - -const databaseUrl = resolveVar("TASK_RUN_DATABASE_URL"); -const directUrl = resolveVar("TASK_RUN_DATABASE_DIRECT_URL") || databaseUrl; - -if (!databaseUrl) { - console.error( - "db:migrate:run-ops: TASK_RUN_DATABASE_URL is not set (checked env and .env). " + - "It is the run-ops database in the split — nothing to migrate without it." - ); - process.exit(1); -} - -console.log(`Applying Prisma migrations to the run-ops database (${redact(databaseUrl)})`); - -const result = spawnSync("prisma", ["migrate", "deploy"], { - cwd: dbPackageRoot, - stdio: "inherit", - env: { ...process.env, DATABASE_URL: databaseUrl, DIRECT_URL: directUrl }, -}); - -process.exit(result.status ?? 1); diff --git a/internal-packages/testcontainers/src/index.ts b/internal-packages/testcontainers/src/index.ts index 365d6605afa..dd61f8f89b0 100644 --- a/internal-packages/testcontainers/src/index.ts +++ b/internal-packages/testcontainers/src/index.ts @@ -17,6 +17,7 @@ import { getTaskMetadata, logCleanup, logSetup } from "./logs"; import { type MinIOConnectionConfig, type StartedMinIOContainer, MinIOContainer } from "./minio"; import { createClickHouseContainer, + createElectricContainer, createPostgresContainer, createRedisContainer, postgresUriWithDatabase, @@ -43,8 +44,14 @@ type RedisContext = NetworkContext & { redisOptions: RedisOptions; }; +type ElectricContext = { + electricOrigin: string; +}; + export type ContainerContext = NetworkContext & PostgresContext & RedisContext & ClickhouseContext; export type PostgresAndRedisContext = NetworkContext & PostgresContext & RedisContext; +export type ContainerWithElectricAndRedisContext = ContainerContext & ElectricContext; +export type ContainerWithElectricContext = NetworkContext & PostgresContext & ElectricContext; export type { StartedClickHouseContainer, @@ -763,6 +770,24 @@ export const replicationContainerTest = test.extend({ + network, + postgresContainer, + prisma, + electricOrigin, +}); + +export const containerWithElectricAndRedisTest = test.extend({ + network, + postgresContainer, + prisma, + redisContainer, + redisOptions, + electricOrigin, + clickhouseContainer, + clickhouseClient, +}); + // Boot minio once per worker; reset the bucket per test (auto fixture). const bootWorkerMinio = async ({}, use: Use) => { const container = await withCiResourceLimits(new MinIOContainer()).start(); diff --git a/internal-packages/testcontainers/src/utils.ts b/internal-packages/testcontainers/src/utils.ts index bece1dab99a..505a919ed56 100644 --- a/internal-packages/testcontainers/src/utils.ts +++ b/internal-packages/testcontainers/src/utils.ts @@ -249,6 +249,33 @@ async function verifyRedisConnection(container: StartedRedisContainer) { } } +export async function createElectricContainer( + postgresContainer: StartedPostgreSqlContainer, + network: StartedNetwork +) { + const databaseUrl = `postgresql://${postgresContainer.getUsername()}:${postgresContainer.getPassword()}@${postgresContainer.getIpAddress( + network.getName() + )}:5432/${postgresContainer.getDatabase()}?sslmode=disable`; + + const container = await withCiResourceLimits( + new GenericContainer( + "electricsql/electric:1.2.4@sha256:20da3d0b0e74926c5623392db67fd56698b9e374c4aeb6cb5cadeb8fea171c36" + ) + ) + .withExposedPorts(3000) + .withNetwork(network) + .withEnvironment({ + DATABASE_URL: databaseUrl, + ELECTRIC_INSECURE: "true", + }) + .start(); + + return { + container, + origin: `http://${container.getHost()}:${container.getMappedPort(3000)}`, + }; +} + export async function createMinIOContainer(network: StartedNetwork) { const container = await withCiResourceLimits(new MinIOContainer()) .withNetwork(network) From 1c239fa247cbd8d152d5c4a57e6f4fbe046bbb69 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 1 Jul 2026 18:47:12 +0100 Subject: [PATCH 3/8] remove redundant migrate-run-ops.mjs script MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The run-ops database is migrated first-class by the @internal/run-ops-database package's own prisma migrate deploy (its schema.prisma datasource reads RUN_OPS_DATABASE_URL and it owns its own migrations dir), which the root db:migrate already sweeps via turbo. The bespoke script instead ran prisma migrate deploy from the control-plane database package against the run-ops DB, applying the full control-plane migration history rather than the run-ops subset — redundant with the native flow and wrong. Drop the script and its two script entries (root package.json, database package.json). Co-Authored-By: Claude Opus 4.8 (1M context) --- internal-packages/database/package.json | 1 - package.json | 1 - 2 files changed, 2 deletions(-) diff --git a/internal-packages/database/package.json b/internal-packages/database/package.json index 55ef7cf47de..ec0bc950b8c 100644 --- a/internal-packages/database/package.json +++ b/internal-packages/database/package.json @@ -18,7 +18,6 @@ "generate": "prisma generate", "db:migrate:dev:create": "prisma migrate dev --create-only", "db:migrate:deploy": "prisma migrate deploy", - "db:migrate:run-ops": "node scripts/migrate-run-ops.mjs", "db:push": "prisma db push", "db:studio": "prisma studio", "db:reset": "prisma migrate reset", diff --git a/package.json b/package.json index 185d97cc7c6..e3e134812a7 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,6 @@ "build:force": "turbo run build --force", "build:db:seed": "turbo run build:db:seed", "db:migrate": "turbo run db:migrate:deploy generate", - "db:migrate:run-ops": "pnpm --filter @trigger.dev/database run db:migrate:run-ops", "db:seed": "turbo run db:seed", "db:studio": "turbo run db:studio", "db:populate": "turbo run db:populate", From 1a5665c3825079ff40699b6ddb99a4e86b60ff4d Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Thu, 2 Jul 2026 10:17:46 +0100 Subject: [PATCH 4/8] docs(run-ops db): document per-table divergence from the control-plane schema Rework the run-ops schema comments so they read as documentation rather than working notes. Replaces the top-of-file header with prose describing the subset relationship and the three relation-handling rules, adds a short header above each model stating how it differs from its control-plane counterpart (which relations are scalar-only FKs, which are FK-free join models, which stay as real FKs), and removes the stale group-(A)/group-(B)/"drop" tags. Comments only; no model, field, type, attribute, index, or enum is changed. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../run-ops-database/prisma/schema.prisma | 242 +++++++++++------- 1 file changed, 148 insertions(+), 94 deletions(-) diff --git a/internal-packages/run-ops-database/prisma/schema.prisma b/internal-packages/run-ops-database/prisma/schema.prisma index 2d749f960bf..49842325c71 100644 --- a/internal-packages/run-ops-database/prisma/schema.prisma +++ b/internal-packages/run-ops-database/prisma/schema.prisma @@ -11,20 +11,35 @@ generator client { previewFeatures = ["metrics"] } -// // ───────────────────────────────────────────────────────────────────────────── -// Dedicated run-ops subgraph schema (the NEW DB). +// Dedicated run-ops database schema. // -// This is a copied SUBSET of the control-plane `@trigger.dev/database` schema, -// reproduced verbatim minus: -// 1. control-plane `@relation`s (org/project/environment/worker/queue) — -// demoted to plain scalar FK columns, no `@relation`, no FK. -// 2. group-(A) cross-DB-capable waitpoint-block `@relation`s — demoted to plain -// scalar columns and explicit FK-free join models (a block edge can straddle -// the two DBs, so it MUST NOT be enforced by an FK on the NEW DB). -// group-(B) co-resident run-ops→run-ops relations are KEPT as real FKs. -// ───────────────────────────────────────────────────────────────────────────── +// This schema holds the subset of the control-plane `@trigger.dev/database` +// models that describe a run's lifecycle. Each model reproduces its +// control-plane counterpart column-for-column; the only differences are in how +// relations are handled, according to three rules: +// +// 1. Relations that point at control-plane-owned models (organization, +// project, environment, background worker, queue, and the like) are not +// reproduced. Their foreign-key columns are kept as plain scalars, with no +// `@relation` and no database foreign key, because the referenced rows live +// in the control-plane database. +// +// 2. Waitpoint block and completion relations are foreign-key-free. A block or +// completion edge can connect a run and a waitpoint that live in different +// databases, so it must never be enforced by a foreign key here. These +// relations are represented by scalar columns plus the explicit join models +// `TaskRunWaitpoint`, `WaitpointRunConnection`, and `CompletedWaitpoint`. +// The join models replace implicit many-to-many relations that exist in the +// control-plane schema. +// +// 3. Relations that stay entirely within this database (run to root/parent +// run, run to attempt, run to batch, checkpoints, dependencies, and so on) +// are kept as real foreign keys, exactly as in the control plane. // +// Relation-only back-references to models outside this subset are dropped; the +// per-model notes below call out where that happens. +// ───────────────────────────────────────────────────────────────────────────── enum RuntimeEnvironmentType { PRODUCTION @@ -33,6 +48,15 @@ enum RuntimeEnvironmentType { PREVIEW } +/// Same columns and indexes as the control-plane `TaskRun`. The environment, +/// project, `lockedBy`, and `lockedToVersion` relations are scalar-only foreign +/// keys here. The waitpoint completion/block relations +/// (`associatedWaitpoint`, `blockedByWaitpoints`, `connectedWaitpoints`) are +/// foreign-key-free: completion lives on `Waitpoint.completedByTaskRunId`, block +/// edges are rows of `TaskRunWaitpoint`, and connections are rows of +/// `WaitpointRunConnection`. The root/parent-run, parent-attempt, and batch +/// relations are kept as real foreign keys. Control-plane back-references to +/// tags, alerts, bulk-action items, and playground conversations are dropped. model TaskRun { id String @id @default(cuid()) @@ -64,12 +88,12 @@ model TaskRun { traceId String spanId String - // scalarized control-plane FK (was `runtimeEnvironment @relation`) + /// scalar FK (control-plane `runtimeEnvironment` relation) runtimeEnvironmentId String environmentType RuntimeEnvironmentType? - // scalarized control-plane FK (was `project @relation`) + /// scalar FK (control-plane `project` relation) projectId String organizationId String? @@ -117,11 +141,11 @@ model TaskRun { costInCents Float @default(0) baseCostInCents Float @default(0) - lockedAt DateTime? - // scalarized control-plane FK (was `lockedBy @relation`) + lockedAt DateTime? + /// scalar FK (control-plane `lockedBy` relation) lockedById String? - // scalarized control-plane FK (was `lockedToVersion @relation`) + /// scalar FK (control-plane `lockedToVersion` relation) lockedToVersionId String? /// The "priority" of the run. This is just a negative offset in ms for the queue timestamp @@ -140,29 +164,20 @@ model TaskRun { /// optional token that can be used to authenticate the task run oneTimeUseToken String? - /// GROUP (A) FK-FREE: when this run is finished, the waitpoint will be marked as - /// completed. Was `associatedWaitpoint Waitpoint? @relation("CompletingRun")`. - /// The completion back-ref lives on `Waitpoint.completedByTaskRunId` (scalar). - - /// GROUP (A) FK-FREE: if there are any blocked waitpoints, the run won't be - /// executed. Was `blockedByWaitpoints TaskRunWaitpoint[]`. Block edges are the - /// rows of `TaskRunWaitpoint` keyed by the scalar `taskRunId`. - - /// GROUP (A) FK-FREE: all waitpoints that blocked this run at some point, used - /// for display purposes. Was the implicit M2M - /// `connectedWaitpoints Waitpoint[] @relation("WaitpointRunConnections")`, - /// now the explicit FK-free join model `WaitpointRunConnection`. + // The control-plane waitpoint relations (associatedWaitpoint, blockedByWaitpoints, + // connectedWaitpoints) are foreign-key-free here: completion is stored on + // Waitpoint.completedByTaskRunId, block edges are rows of TaskRunWaitpoint, and + // connections are rows of WaitpointRunConnection. /// Where the logs are stored taskEventStore String @default("taskEvent") queueTimestamp DateTime? - batchItems BatchTaskRunItem[] - dependency TaskRunDependency? - // Renamed back-ref kept verbatim from control plane + batchItems BatchTaskRunItem[] + dependency TaskRunDependency? CheckpointRestoreEvent CheckpointRestoreEvent[] - executionSnapshots TaskRunExecutionSnapshot[] + executionSnapshots TaskRunExecutionSnapshot[] scheduleInstanceId String? scheduleId String? @@ -173,25 +188,25 @@ model TaskRun { replayedFromTaskRunFriendlyId String? - /// GROUP (B) KEEP: this represents the original task that was triggered outside of a Trigger.dev task + /// This represents the original task that was triggered outside of a Trigger.dev task rootTaskRun TaskRun? @relation("TaskRootRun", fields: [rootTaskRunId], references: [id], onDelete: SetNull, onUpdate: NoAction) rootTaskRunId String? /// The root run will have a list of all the descendant runs, children, grand children, etc. descendantRuns TaskRun[] @relation("TaskRootRun") - /// GROUP (B) KEEP: the immediate parent run of this task run + /// The immediate parent run of this task run parentTaskRun TaskRun? @relation("TaskParentRun", fields: [parentTaskRunId], references: [id], onDelete: SetNull, onUpdate: NoAction) parentTaskRunId String? /// The immediate child runs of this task run childRuns TaskRun[] @relation("TaskParentRun") - /// GROUP (B) KEEP: the immediate parent attempt of this task run + /// The immediate parent attempt of this task run parentTaskRunAttempt TaskRunAttempt? @relation("TaskParentRunAttempt", fields: [parentTaskRunAttemptId], references: [id], onDelete: SetNull, onUpdate: NoAction) parentTaskRunAttemptId String? - /// GROUP (B) KEEP: the batch run that this task run is a part of (co-resident) + /// The batch run that this task run is a part of batch BatchTaskRun? @relation(fields: [batchId], references: [id], onDelete: SetNull, onUpdate: NoAction) batchId String? @@ -329,6 +344,11 @@ enum RunEngineVersion { /// and prevent side effects like heartbeats failing a run that has progressed. /// It is optimised for performance and is designed to be cleared at some point, /// so there are no cascading relationships to other models. +/// +/// Same columns and indexes as the control-plane version. The `run` and +/// `checkpoint` relations are kept as real foreign keys. The `completedWaitpoints` +/// relation is foreign-key-free: it is represented by rows of the `CompletedWaitpoint` +/// join model instead. model TaskRunExecutionSnapshot { id String @id @default(cuid()) @@ -347,7 +367,7 @@ model TaskRunExecutionSnapshot { /// The previous snapshot ID previousSnapshotId String? - /// GROUP (B) KEEP: Run (co-resident) + /// Run runId String run TaskRun @relation(fields: [runId], references: [id]) runStatus TaskRunStatus @@ -358,20 +378,19 @@ model TaskRunExecutionSnapshot { /// This is the current run attempt number. Users can define how many attempts they want for a run. attemptNumber Int? - /// Environment (already plain scalars in the control plane) + /// Environment environmentId String environmentType RuntimeEnvironmentType projectId String organizationId String - /// GROUP (A) FK-FREE: waitpoints that have been completed for this execution. - /// Was the implicit M2M `completedWaitpoints Waitpoint[] @relation("completedWaitpoints")`, - /// now the explicit FK-free join model `CompletedWaitpoint`. + // The control-plane `completedWaitpoints` relation (waitpoints completed for this + // execution) is foreign-key-free here, stored as rows of CompletedWaitpoint. /// An array of waitpoint IDs in the correct order, used for batches completedWaitpointOrder String[] - /// GROUP (B) KEEP: Checkpoint (co-resident) + /// Checkpoint checkpointId String? checkpoint TaskRunCheckpoint? @relation(fields: [checkpointId], references: [id]) @@ -414,6 +433,8 @@ enum TaskRunExecutionStatus { FINISHED } +/// Same columns as the control-plane version, with the project and environment +/// relations kept as scalar-only foreign keys. model TaskRunCheckpoint { id String @id @default(cuid()) @@ -425,10 +446,10 @@ model TaskRunCheckpoint { reason String? metadata String? - // scalarized control-plane FK (was `project @relation`) + /// scalar FK (control-plane `project` relation) projectId String - // scalarized control-plane FK (was `runtimeEnvironment @relation`) + /// scalar FK (control-plane `runtimeEnvironment` relation) runtimeEnvironmentId String executionSnapshot TaskRunExecutionSnapshot[] @@ -445,6 +466,14 @@ enum TaskRunCheckpointType { /// A Waitpoint blocks a run from continuing until it's completed /// If there's a waitpoint blocking a run, it shouldn't be in the queue +/// +/// Same columns and indexes as the control-plane version. Every relation is +/// foreign-key-free, because a waitpoint and the run or batch it references may +/// live in different databases: `completedByTaskRunId` and `completedByBatchId` +/// are scalar-only, and the `blockingTaskRuns`, `connectedRuns`, and +/// `completedExecutionSnapshots` back-references are represented by the +/// `TaskRunWaitpoint`, `WaitpointRunConnection`, and `CompletedWaitpoint` join +/// models. The project and environment relations are scalar-only foreign keys. model Waitpoint { id String @id @default(cuid()) @@ -468,28 +497,27 @@ model Waitpoint { /// This is a workaround because Prisma doesn't support partial indexes. inactiveIdempotencyKey String? - /// GROUP (A) FK-FREE: RUN-type completing run, scalar only (a LEGACY token can be completed by a NEW run). + /// If it's a RUN type waitpoint, this is the associated run. + /// scalar FK (control-plane `completedByTaskRun` relation) completedByTaskRunId String? @unique /// If it's a DATETIME type waitpoint, this is the date. /// If it's a MANUAL waitpoint, this can be set as the `timeout`. completedAfter DateTime? - /// GROUP (A) FK-FREE: BATCH-type completing batch, scalar only. + /// If it's a BATCH type waitpoint, this is the associated batch. + /// scalar FK (control-plane `completedByBatch` relation) completedByBatchId String? - // GROUP (A) FK-FREE back-refs dropped: blocking edges live in TaskRunWaitpoint; - // connectedRuns -> WaitpointRunConnection; completing snapshots -> CompletedWaitpoint. - /// When completed, an output can be stored here output String? outputType String @default("application/json") outputIsError Boolean @default(false) - // scalarized control-plane FK (was `project @relation`) + /// scalar FK (control-plane `project` relation) projectId String - // scalarized control-plane FK (was `environment @relation`) + /// scalar FK (control-plane `environment` relation) environmentId String createdAt DateTime @default(now()) @@ -524,22 +552,27 @@ enum WaitpointStatus { COMPLETED } +/// The block edge between a run and a waitpoint. Same columns and indexes as the +/// control-plane version, but the run, waitpoint, project, and batch relations +/// are all scalar-only foreign keys: a block edge can connect a run and a +/// waitpoint that live in different databases, so it is never enforced by a +/// foreign key here. model TaskRunWaitpoint { id String @id @default(cuid()) - /// GROUP (A) FK-FREE: the block edge → blocked run (was a TaskRun FK). + /// scalar FK (control-plane `taskRun` relation) taskRunId String - /// GROUP (A) FK-FREE: the block edge → its waitpoint (was a Waitpoint FK). + /// scalar FK (control-plane `waitpoint` relation) waitpointId String - // scalarized control-plane FK (was a project FK) + /// scalar FK (control-plane `project` relation) projectId String /// This span id is completed when the waitpoint is completed. This is used with cached runs (idempotent) spanIdToComplete String? - /// GROUP (A) FK-FREE: associated batch (was a BatchTaskRun FK). + /// scalar FK (control-plane `batch` relation) batchId String? //if there's an associated batch and this isn't set it's for the entire batch //if it is set, it's a specific run in the batch @@ -555,9 +588,11 @@ model TaskRunWaitpoint { @@index([waitpointId]) } -/// GROUP (A) FK-FREE explicit join model — replaces the implicit M2M -/// `_WaitpointRunConnections` (TaskRun.connectedWaitpoints ↔ Waitpoint.connectedRuns). -/// Scalar columns only, no FK, so a run↔waitpoint connection may straddle DBs. +/// Explicit join model for the connection between a run and every waitpoint that +/// blocked it (kept for display). This has no control-plane counterpart table: it +/// replaces the implicit many-to-many relation between `TaskRun.connectedWaitpoints` +/// and `Waitpoint.connectedRuns`. Scalar columns only, no foreign keys, so a +/// connection can span both databases. model WaitpointRunConnection { id String @id @default(cuid()) @@ -569,9 +604,11 @@ model WaitpointRunConnection { @@index([waitpointId]) } -/// GROUP (A) FK-FREE explicit join model — replaces the implicit M2M -/// `_completedWaitpoints` (TaskRunExecutionSnapshot.completedWaitpoints ↔ Waitpoint). -/// Scalar columns only, no FK, so a NEW snapshot may record a LEGACY token. +/// Explicit join model recording which waitpoints an execution snapshot completed. +/// This has no control-plane counterpart table: it replaces the implicit +/// many-to-many relation between `TaskRunExecutionSnapshot.completedWaitpoints` +/// and `Waitpoint`. Scalar columns only, no foreign keys, so a snapshot may record +/// a waitpoint that lives in the other database. model CompletedWaitpoint { id String @id @default(cuid()) @@ -583,14 +620,16 @@ model CompletedWaitpoint { @@index([waitpointId]) } +/// Same columns and indexes as the control-plane version, with the environment +/// and project relations kept as scalar-only foreign keys. model WaitpointTag { id String @id @default(cuid()) name String - // scalarized control-plane FK (was `environment @relation`) + /// scalar FK (control-plane `environment` relation) environmentId String - // scalarized control-plane FK (was `project @relation`) + /// scalar FK (control-plane `project` relation) projectId String createdAt DateTime @default(now()) @@ -598,13 +637,16 @@ model WaitpointTag { @@unique([environmentId, name]) } +/// Same columns and indexes as the control-plane version, with the project +/// relation kept as a scalar-only foreign key. The control-plane `runs` back- +/// reference (the tag-to-run many-to-many) is dropped. model TaskRunTag { id String @id @default(cuid()) name String friendlyId String @unique - // scalarized control-plane FK (was `project @relation`) + /// scalar FK (control-plane `project` relation) projectId String createdAt DateTime @default(now()) @@ -615,22 +657,24 @@ model TaskRunTag { } /// This is used for triggerAndWait and batchTriggerAndWait. The taskRun is the child task, it points at a parent attempt or a batch +/// +/// Identical to the control-plane version: every relation stays within this +/// database and is kept as a real foreign key. model TaskRunDependency { id String @id @default(cuid()) - /// GROUP (B) KEEP: the child run (co-resident) + /// The child run taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) taskRunId String @unique - /// GROUP (B) KEEP checkpointEvent CheckpointRestoreEvent? @relation(fields: [checkpointEventId], references: [id], onDelete: Cascade, onUpdate: Cascade) checkpointEventId String? @unique - /// GROUP (B) KEEP: an attempt that is dependent on this task run. + /// An attempt that is dependent on this task run. dependentAttempt TaskRunAttempt? @relation(fields: [dependentAttemptId], references: [id]) dependentAttemptId String? - /// GROUP (B) KEEP: a batch run that is dependent on this task run + /// A batch run that is dependent on this task run dependentBatchRun BatchTaskRun? @relation("dependentBatchRun", fields: [dependentBatchRunId], references: [id]) dependentBatchRunId String? @@ -642,26 +686,30 @@ model TaskRunDependency { @@index([dependentBatchRunId]) } +/// Same columns and indexes as the control-plane version. The owning-run relation +/// is kept as a real foreign key. The background-worker, background-worker-task, +/// environment, and queue relations are scalar-only foreign keys. The control- +/// plane `alerts` back-reference is dropped. model TaskRunAttempt { id String @id @default(cuid()) number Int @default(0) friendlyId String @unique - /// GROUP (B) KEEP: the owning run (co-resident) + /// The owning run taskRun TaskRun @relation("attempts", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) taskRunId String - // scalarized control-plane FK (was `backgroundWorker @relation`) + /// scalar FK (control-plane `backgroundWorker` relation) backgroundWorkerId String - // scalarized control-plane FK (was `backgroundWorkerTask @relation`) + /// scalar FK (control-plane `backgroundWorkerTask` relation) backgroundWorkerTaskId String - // scalarized control-plane FK (was `runtimeEnvironment @relation`) + /// scalar FK (control-plane `runtimeEnvironment` relation) runtimeEnvironmentId String - // scalarized control-plane FK (was `queue @relation`) + /// scalar FK (control-plane `queue` relation) queueId String status TaskRunAttemptStatus @default(PENDING) @@ -701,12 +749,20 @@ enum TaskRunAttemptStatus { COMPLETED } +/// Same columns and indexes as the control-plane version. The environment +/// relation is a scalar-only foreign key. The `runsBlocked` and `waitpoints` +/// back-references are foreign-key-free: block edges are rows of +/// `TaskRunWaitpoint`, and batch-completed waitpoints are found via +/// `Waitpoint.completedByBatchId`. The `runs`, `errors`, `items`, +/// `checkpointEvent`, `dependentTaskAttempt`, and `runDependencies` relations +/// stay within this database and are kept as real foreign keys. model BatchTaskRun { id String @id @default(cuid()) friendlyId String @unique idempotencyKey String? idempotencyKeyExpiresAt DateTime? status BatchTaskRunStatus @default(PENDING) + /// scalar FK (control-plane `runtimeEnvironment` relation) runtimeEnvironmentId String /// This only includes new runs, not idempotent runs. runs TaskRun[] @@ -723,12 +779,9 @@ model BatchTaskRun { batchVersion String @default("v1") //engine v2 - /// GROUP (A) FK-FREE: specific run blockers. Was `runsBlocked TaskRunWaitpoint[]`; - /// block edges are the rows of `TaskRunWaitpoint` keyed by the scalar `batchId`. - - /// GROUP (A) FK-FREE: waitpoints that are blocked by this batch. - /// Was `waitpoints Waitpoint[]`; the completion back-ref lives on - /// `Waitpoint.completedByBatchId` (scalar). + // The control-plane `runsBlocked` and `waitpoints` back-references are foreign- + // key-free here: block edges are rows of TaskRunWaitpoint keyed by batchId, and + // batch-completed waitpoints are found via Waitpoint.completedByBatchId. // This is for v3 batches /// sealed is set to true once no more items can be added to the batch @@ -763,10 +816,8 @@ model BatchTaskRun { ///all the below properties are engine v1 only items BatchTaskRunItem[] taskIdentifier String? - /// GROUP (B) KEEP checkpointEvent CheckpointRestoreEvent? @relation(fields: [checkpointEventId], references: [id], onDelete: Cascade, onUpdate: Cascade) checkpointEventId String? @unique - /// GROUP (B) KEEP dependentTaskAttempt TaskRunAttempt? @relation(fields: [dependentTaskAttemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) dependentTaskAttemptId String? runDependencies TaskRunDependency[] @relation("dependentBatchRun") @@ -788,20 +839,20 @@ enum BatchTaskRunStatus { } ///Used in engine V1 only +/// +/// Identical to the control-plane version: every relation stays within this +/// database and is kept as a real foreign key. model BatchTaskRunItem { id String @id @default(cuid()) status BatchTaskRunItemStatus @default(PENDING) - /// GROUP (B) KEEP batchTaskRun BatchTaskRun @relation(fields: [batchTaskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) batchTaskRunId String - /// GROUP (B) KEEP taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) taskRunId String - /// GROUP (B) KEEP taskRunAttempt TaskRunAttempt? @relation(fields: [taskRunAttemptId], references: [id], onDelete: SetNull, onUpdate: Cascade) taskRunAttemptId String? @@ -822,9 +873,11 @@ enum BatchTaskRunItemStatus { } /// Track individual run creation failures in batch processing (Run Engine v2) +/// +/// Identical to the control-plane version: the batch relation stays within this +/// database and is kept as a real foreign key. model BatchTaskRunError { id String @id @default(cuid()) - /// GROUP (B) KEEP batchTaskRun BatchTaskRun @relation(fields: [batchTaskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) batchTaskRunId String @@ -847,6 +900,9 @@ model BatchTaskRunError { @@index([batchTaskRunId]) } +/// Same columns and indexes as the control-plane version. The run and attempt +/// relations are kept as real foreign keys; the project and environment relations +/// are scalar-only foreign keys. model Checkpoint { id String @id @default(cuid()) @@ -860,19 +916,17 @@ model Checkpoint { events CheckpointRestoreEvent[] - /// GROUP (B) KEEP run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade) runId String - /// GROUP (B) KEEP attempt TaskRunAttempt @relation(fields: [attemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) attemptId String attemptNumber Int? - // scalarized control-plane FK (was `project @relation`) + /// scalar FK (control-plane `project` relation) projectId String - // scalarized control-plane FK (was `runtimeEnvironment @relation`) + /// scalar FK (control-plane `runtimeEnvironment` relation) runtimeEnvironmentId String createdAt DateTime @default(now()) @@ -887,6 +941,9 @@ enum CheckpointType { KUBERNETES } +/// Same columns and indexes as the control-plane version. The checkpoint, run, +/// and attempt relations are kept as real foreign keys; the project and +/// environment relations are scalar-only foreign keys. model CheckpointRestoreEvent { id String @id @default(cuid()) @@ -894,22 +951,19 @@ model CheckpointRestoreEvent { reason String? metadata String? - /// GROUP (B) KEEP checkpoint Checkpoint @relation(fields: [checkpointId], references: [id], onDelete: Cascade, onUpdate: Cascade) checkpointId String - /// GROUP (B) KEEP run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade) runId String - /// GROUP (B) KEEP attempt TaskRunAttempt @relation(fields: [attemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) attemptId String - // scalarized control-plane FK (was `project @relation`) + /// scalar FK (control-plane `project` relation) projectId String - // scalarized control-plane FK (was `runtimeEnvironment @relation`) + /// scalar FK (control-plane `runtimeEnvironment` relation) runtimeEnvironmentId String taskRunDependency TaskRunDependency? From 5b0ef336b31ef6e719e46a697324f016128e0de1 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Thu, 2 Jul 2026 15:32:11 +0100 Subject: [PATCH 5/8] style(run-ops): apply oxfmt Co-Authored-By: Claude Opus 4.8 (1M context) --- internal-packages/run-ops-database/tsconfig.json | 8 +------- internal-packages/testcontainers/src/utils.ts | 10 +++++++++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/internal-packages/run-ops-database/tsconfig.json b/internal-packages/run-ops-database/tsconfig.json index fda3a5ed806..8b889b0dbb6 100644 --- a/internal-packages/run-ops-database/tsconfig.json +++ b/internal-packages/run-ops-database/tsconfig.json @@ -9,11 +9,5 @@ "noEmit": true, "strict": true }, - "exclude": [ - "node_modules", - "dist", - "generated", - "**/*.test.ts", - "vitest.config.ts" - ] + "exclude": ["node_modules", "dist", "generated", "**/*.test.ts", "vitest.config.ts"] } diff --git a/internal-packages/testcontainers/src/utils.ts b/internal-packages/testcontainers/src/utils.ts index 505a919ed56..b13ab7ea859 100644 --- a/internal-packages/testcontainers/src/utils.ts +++ b/internal-packages/testcontainers/src/utils.ts @@ -88,7 +88,15 @@ async function pushPrismaSchema({ // push looks like success and only surfaces much later as a confusing downstream error. const result = await x( prismaBin, - ["db", "push", "--force-reset", "--accept-data-loss", "--skip-generate", "--schema", schemaPath], + [ + "db", + "push", + "--force-reset", + "--accept-data-loss", + "--skip-generate", + "--schema", + schemaPath, + ], { throwOnError: true, nodeOptions: { From 3459b33b6ce1bc783672b608a0393b8f3c762ee6 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Thu, 2 Jul 2026 20:00:37 +0100 Subject: [PATCH 6/8] test(run-ops): strip schema comments before parity regexes; dedupe PG17 fixture bootstrap Co-Authored-By: Claude Opus 4.8 (1M context) --- .../prisma/schema.parity.test.ts | 13 +++- internal-packages/testcontainers/src/index.ts | 60 +++++++------------ 2 files changed, 32 insertions(+), 41 deletions(-) diff --git a/internal-packages/run-ops-database/prisma/schema.parity.test.ts b/internal-packages/run-ops-database/prisma/schema.parity.test.ts index 920def9813f..3598b9c8111 100644 --- a/internal-packages/run-ops-database/prisma/schema.parity.test.ts +++ b/internal-packages/run-ops-database/prisma/schema.parity.test.ts @@ -23,9 +23,16 @@ function readSchema(rel: string) { return readFileSync(resolve(__dirname, rel), "utf8"); } +// Prisma comments (`///` docs and `//` lines) may legitimately mention +// control-plane model names in prose, which would false-match the drift +// regexes below. Strip them so parity assertions only see real schema syntax. +function stripComments(schema: string) { + return schema.replace(/\/\/.*$/gm, ""); +} + describe("dedicated run-ops schema parity", () => { it("references no control-plane model as a relation target", () => { - const dedicated = readSchema("./schema.prisma"); + const dedicated = stripComments(readSchema("./schema.prisma")); for (const model of CONTROL_PLANE_MODELS) { // A relation target appears as ` fieldName Model @relation(...)`. A bare // scalar column like `projectId String` is fine; the model TYPE must be absent. @@ -60,7 +67,7 @@ describe("dedicated run-ops schema parity", () => { }); it("keeps the group-(A) waitpoint-block references FK-FREE (scalar columns / explicit FK-free join models)", () => { - const dedicated = readSchema("./schema.prisma"); + const dedicated = stripComments(readSchema("./schema.prisma")); // TaskRunWaitpoint must NOT carry a `@relation` to Waitpoint/TaskRun/BatchTaskRun. const trw = dedicated.match(/model TaskRunWaitpoint \{[\s\S]*?\n\}/)![0]; expect(trw).not.toMatch(/@relation/); @@ -77,7 +84,7 @@ describe("dedicated run-ops schema parity", () => { }); it("keeps the group-(B) co-resident references as real FKs (e.g. TaskRunAttempt.taskRun)", () => { - const dedicated = readSchema("./schema.prisma"); + const dedicated = stripComments(readSchema("./schema.prisma")); const attempt = dedicated.match(/model TaskRunAttempt \{[\s\S]*?\n\}/)![0]; // The attempt->run relation stays a real FK (always co-resident). expect(attempt).toMatch(/taskRun\s+TaskRun\s+@relation/); diff --git a/internal-packages/testcontainers/src/index.ts b/internal-packages/testcontainers/src/index.ts index dd61f8f89b0..1bb9b3706b1 100644 --- a/internal-packages/testcontainers/src/index.ts +++ b/internal-packages/testcontainers/src/index.ts @@ -176,28 +176,30 @@ export const HETERO_PINNED_ICU_COLLATION = "und-x-icu"; // PG17 worker singleton mirroring getWorkerPostgresContainer. PG17 supports the ICU cluster locale // provider (PG14 does not - it arrived in PG15), so only this side sets the cluster locale; the real // cross-version guarantee is the per-column COLLATE in the proof test. +async function bootstrapPg17TemplateContainer( + pushSchema: (databaseUrl: string) => Promise +): Promise { + const container = await withCiResourceLimits(new PostgreSqlContainer("docker.io/postgres:17")) + .withCommand(["-c", "listen_addresses=*", "-c", "wal_level=logical"]) + .withEnvironment({ + POSTGRES_INITDB_ARGS: "--locale-provider=icu --icu-locale=en-US --encoding=UTF8", + }) + .start(); + const admin = new PrismaClient({ + datasources: { + db: { url: postgresUriWithDatabase(container.getConnectionUri(), "postgres") }, + }, + }); + await admin.$executeRawUnsafe(`CREATE DATABASE "${POSTGRES_TEMPLATE_DB}"`); + await admin.$disconnect(); + await pushSchema(postgresUriWithDatabase(container.getConnectionUri(), POSTGRES_TEMPLATE_DB)); + return container; +} + let workerPostgresContainer17: Promise | undefined; const getWorkerPostgresContainer17 = () => { if (!workerPostgresContainer17) { - workerPostgresContainer17 = (async () => { - const container = await withCiResourceLimits(new PostgreSqlContainer("docker.io/postgres:17")) - .withCommand(["-c", "listen_addresses=*", "-c", "wal_level=logical"]) - .withEnvironment({ - POSTGRES_INITDB_ARGS: "--locale-provider=icu --icu-locale=en-US --encoding=UTF8", - }) - .start(); - const admin = new PrismaClient({ - datasources: { - db: { url: postgresUriWithDatabase(container.getConnectionUri(), "postgres") }, - }, - }); - await admin.$executeRawUnsafe(`CREATE DATABASE "${POSTGRES_TEMPLATE_DB}"`); - await admin.$disconnect(); - await pushDatabaseSchema( - postgresUriWithDatabase(container.getConnectionUri(), POSTGRES_TEMPLATE_DB) - ); - return container; - })(); + workerPostgresContainer17 = bootstrapPg17TemplateContainer(pushDatabaseSchema); } return workerPostgresContainer17; }; @@ -209,25 +211,7 @@ const getWorkerPostgresContainer17 = () => { let runOpsWorkerPostgresContainer17: Promise | undefined; const getRunOpsWorkerPostgresContainer17 = () => { if (!runOpsWorkerPostgresContainer17) { - runOpsWorkerPostgresContainer17 = (async () => { - const container = await withCiResourceLimits(new PostgreSqlContainer("docker.io/postgres:17")) - .withCommand(["-c", "listen_addresses=*", "-c", "wal_level=logical"]) - .withEnvironment({ - POSTGRES_INITDB_ARGS: "--locale-provider=icu --icu-locale=en-US --encoding=UTF8", - }) - .start(); - const admin = new PrismaClient({ - datasources: { - db: { url: postgresUriWithDatabase(container.getConnectionUri(), "postgres") }, - }, - }); - await admin.$executeRawUnsafe(`CREATE DATABASE "${POSTGRES_TEMPLATE_DB}"`); - await admin.$disconnect(); - await pushRunOpsSchema( - postgresUriWithDatabase(container.getConnectionUri(), POSTGRES_TEMPLATE_DB) - ); - return container; - })(); + runOpsWorkerPostgresContainer17 = bootstrapPg17TemplateContainer(pushRunOpsSchema); } return runOpsWorkerPostgresContainer17; }; From ea5149194ecfdb07c03b6ed41c09ba036cae937a Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Fri, 3 Jul 2026 10:21:00 +0100 Subject: [PATCH 7/8] build(run-ops): regenerate pnpm-lock after rebase onto main --- pnpm-lock.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d79a7e880cb..47f25346858 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1393,7 +1393,7 @@ importers: version: 6.0.1 vitest: specifier: 4.1.7 - version: 4.1.7(@opentelemetry/api@1.9.1)(@types/node@20.14.14)(@vitest/coverage-v8@4.1.7)(vite@6.4.2(@types/node@20.14.14)(jiti@2.6.1)(lightningcss@1.29.2)(terser@5.46.1)(tsx@4.22.4)(yaml@2.9.0)) + version: 4.1.7(@opentelemetry/api@1.9.1)(@types/node@22.20.0)(@vitest/coverage-v8@4.1.7)(vite@6.4.2(@types/node@22.20.0)(jiti@2.6.1)(lightningcss@1.29.2)(terser@5.46.1)(tsx@4.22.4)(yaml@2.9.0)) internal-packages/run-store: dependencies: From 00cfae3396cf3c5230e7b23b2c3f911fbd92c133 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Fri, 3 Jul 2026 11:43:10 +0100 Subject: [PATCH 8/8] docs(run-ops): add RUN_OPS_DATABASE_URL to .env.example Co-Authored-By: Claude Opus 4.8 (1M context) --- .env.example | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.env.example b/.env.example index 4505cb7933c..ec4488d5f63 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,10 @@ DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres?schema=publi # This sets the URL used for direct connections to the database and should only be needed in limited circumstances # See: https://www.prisma.io/docs/reference/api-reference/prisma-schema-reference#fields:~:text=the%20shadow%20database.-,directUrl,-No DIRECT_URL=${DATABASE_URL} +# Dedicated run-ops database (@internal/run-ops-database). Only needed to run prisma commands +# against it or to enable the run-ops split; start it with `docker compose --profile runops up`. +RUN_OPS_DATABASE_URL=postgresql://postgres:postgres@localhost:5434/postgres?schema=public +RUN_OPS_DATABASE_DIRECT_URL=${RUN_OPS_DATABASE_URL} REMIX_APP_PORT=3030 # Dev-only: stream the webapp's logs over a local telnet/TCP socket (nc localhost 6767). Uncomment to enable. # WEBAPP_TELNET_LOGS_PORT=6767