Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/run-ops-split-webapp-write-path.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Route the webapp write path — trigger/batch run minting, idempotency-key resolution, and run lifecycle writes — through the run store so runs can be created and mutated on the dedicated run-ops database.
7 changes: 5 additions & 2 deletions apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,11 @@ export async function disconnectSession(environmentId: string) {
return session;
}

export async function findLatestSession(environmentId: string) {
const session = await $replica.runtimeEnvironmentSession.findFirst({
export async function findLatestSession(
environmentId: string,
client: PrismaClientOrTransaction = $replica
) {
const session = await client.runtimeEnvironmentSession.findFirst({
where: {
environmentId,
},
Expand Down
93 changes: 49 additions & 44 deletions apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Comment thread
d-cs marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.se
import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server";
import { makeResolveMollifierFlag } from "~/v3/mollifier/mollifierGate.server";
import { runStore } from "~/v3/runStore.server";
import { runOpsLegacyPrisma, runOpsNewPrisma } from "~/db.server";
import { isSplitEnabled } from "~/v3/runOpsMigration/splitMode.server";
import { resolveRunIdMintKind } from "~/v3/engineVersion.server";
import { resolveIdempotencyDedupClient } from "./idempotencyResidency.server";
import type { TraceEventConcern, TriggerTaskRequest } from "../types";

// In-memory per-org mollifier-enabled check, shared with `evaluateGate`
Expand Down Expand Up @@ -147,6 +151,28 @@ export class IdempotencyKeyConcern {
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}

// Probe and clears must hit the DB where the would-be run will physically live.
const dedupClient = await resolveIdempotencyDedupClient(
{
environmentForMint: {
organizationId: request.environment.organizationId,
id: request.environment.id,
orgFeatureFlags: request.environment.organization?.featureFlags,
},
parentRunFriendlyId: request.body.options?.parentRunId,
},
{
isSplitEnabled,
fallbackClient: this.prisma,
newClient: runOpsNewPrisma,
legacyClient: runOpsLegacyPrisma,
resolveMintKind: resolveRunIdMintKind,
// `isMigrated` is intentionally omitted: until a child of a swept
// legacy-id parent can be born on the new DB, the swept-marker override
// would never change the answer, so a child routes by parent id-shape.
}
);

const existingRun = idempotencyKey
? await runStore.findRun(
{
Expand All @@ -159,7 +185,7 @@ export class IdempotencyKeyConcern {
associatedWaitpoint: true,
},
},
this.prisma
dedupClient
)
: undefined;

Expand Down Expand Up @@ -193,7 +219,7 @@ export class IdempotencyKeyConcern {
// Update the existing run to remove the idempotency key
await runStore.clearIdempotencyKey(
{ byId: { runId: existingRun.id, idempotencyKey } },
this.prisma
dedupClient
);

return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
Expand All @@ -210,7 +236,7 @@ export class IdempotencyKeyConcern {
// Update the existing run to remove the idempotency key
await runStore.clearIdempotencyKey(
{ byId: { runId: existingRun.id, idempotencyKey } },
this.prisma
dedupClient
);

return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
Expand Down Expand Up @@ -249,7 +275,6 @@ export class IdempotencyKeyConcern {
? `${event.traceparent.spanId}:${event.spanId}`
: event.spanId;

//block run with waitpoint
await this.engine.blockRunWithWaitpoint({
runId: RunId.fromFriendlyId(parentRunId),
waitpoints: associatedWaitpoint!.id,
Expand All @@ -262,7 +287,7 @@ export class IdempotencyKeyConcern {
: undefined,
projectId: request.environment.projectId,
organizationId: request.environment.organizationId,
tx: this.prisma,
tx: dedupClient,
});
}
);
Expand All @@ -277,24 +302,13 @@ export class IdempotencyKeyConcern {
// (resumeParentOnCompletion) — that path bypasses the gate entirely
// and its existing PG-side dedup is sufficient.
//
// Also gated on the same per-org mollifier flag the gate uses: when
// `TRIGGER_MOLLIFIER_ENABLED=1` globally for staged rollout, the buffer
// singleton is constructed and `claimOrAwait` would otherwise issue a
// Redis SETNX for EVERY idempotency-keyed trigger — including orgs
// that haven't opted in. Those orgs never enter the mollify branch
// (the gate always returns pass_through for them), so there's no
// buffer activity to serialise against; PG's unique constraint
// already deduplicates concurrent same-key races. Resolving the org
// flag is a pure in-memory read of `Organization.featureFlags` — no
// DB query, same predicate the gate uses — keeping the claim's Redis
// RTT off the hot path for non-opted-in orgs during incremental
// rollout.
// Match the gate's bypass list (`mollifierGate.server.ts:158-175`).
// debounce + oneTimeUseToken triggers always return pass_through from
// the gate, so claiming a Redis SETNX here is wasted RTT on the
// trigger hot path. Excluding them keeps the claim aligned with the
// gate — if the gate would never mollify the request, there's no
// buffer to serialise against.
// Gated on the same per-org mollifier flag the gate uses, and the same
// bypass list (debounce + oneTimeUseToken): if the gate would never mollify
// the request, there's no buffer to serialise against and PG's unique
// constraint already deduplicates concurrent same-key races. Skipping the
// claim's Redis SETNX keeps its RTT off the hot path for those requests
// during staged rollout. The org-flag check is a pure in-memory read of
// `Organization.featureFlags`, no DB query.
const claimEligible =
!request.body.options?.resumeParentOnCompletion &&
!request.body.options?.debounce &&
Expand Down Expand Up @@ -336,7 +350,7 @@ export class IdempotencyKeyConcern {
taskIdentifier: request.taskId,
},
{ include: { associatedWaitpoint: true } },
this.prisma
dedupClient
);
if (writerRun) {
return { isCached: true, run: writerRun };
Expand All @@ -350,27 +364,18 @@ export class IdempotencyKeyConcern {
if (buffered) {
return { isCached: true, run: buffered };
}
// Claim resolved to a runId nothing can find — the run was
// genuinely lost (claimant errored after publish, drain failed,
// or both the PG row and buffer entry TTL'd out). This is
// terminal, not transient: `lookupIdempotency` self-heals a
// dangling pointer, and `ack` keeps the entry hash as a
// read-fallback past the PG write, so re-polling cannot conjure
// a run that is gone. Falling through to a fresh trigger is the
// correct recovery.
// Claim resolved to a runId nothing can find — the run was genuinely
// lost (claimant errored after publish, or both the PG row and buffer
// entry TTL'd out). Terminal, not transient, so falling through to a
// fresh trigger is the correct recovery.
//
// Why falling through claimless is safe (no duplicate runs):
// concurrent triggers that also fall through here converge on a
// single run via the same dedup backstops the claim layer relies
// on — the PG unique constraint on the idempotency key
// (RunDuplicateIdempotencyKeyError → retry resolves to the
// winner) for the pass-through path, and `accept`'s idempotency
// SETNX (`duplicate_idempotency`) for the mollify path. Once the
// first fall-through commits a run, later callers find it via the
// writer-PG / buffer lookups above despite the stale `resolved:`
// slot, which the slot's TTL clears within ~30s. The residual
// cost is a few redundant (deduped) trigger attempts in that
// window, not duplicate runs.
// Falling through claimless doesn't duplicate runs: concurrent
// fall-throughs converge on one run via the same dedup backstops the
// claim layer relies on — PG's unique constraint on the idempotency key
// (pass-through path) and `accept`'s SETNX (mollify path). Once the
// first commits, later callers find it via the writer-PG / buffer
// lookups above despite the stale `resolved:` slot (cleared by its ~30s
// TTL). Residual cost is a few deduped trigger attempts, not dup runs.
logger.warn("idempotency claim resolved but runId not findable", {
envId: request.environment.id,
taskIdentifier: request.taskId,
Expand Down
100 changes: 100 additions & 0 deletions apps/webapp/app/runEngine/concerns/idempotencyResidency.server.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { describe, expect, it } from "vitest";
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import {
resolveIdempotencyDedupClient,
type ResolveIdempotencyClientDeps,
} from "./idempotencyResidency.server";

// Distinct sentinel objects so we can assert WHICH client was selected by reference.
const FALLBACK = { __tag: "fallback" } as never;
const NEW_CLIENT = { __tag: "new" } as never;
const LEGACY_CLIENT = { __tag: "legacy" } as never;

function makeDeps(over: Partial<ResolveIdempotencyClientDeps>): ResolveIdempotencyClientDeps {
return {
isSplitEnabled: async () => true,
fallbackClient: FALLBACK,
newClient: NEW_CLIENT,
legacyClient: LEGACY_CLIENT,
resolveMintKind: async () => "ksuid",
classify: (id) => {
if (id.length === 27) return "NEW";
if (id.length === 25) return "LEGACY";
throw new Error(`unclassifiable: ${id.length}`);
},
isMigrated: undefined,
...over,
};
}

const env = { organizationId: "org_1", id: "env_1", orgFeatureFlags: {} };

describe("resolveIdempotencyDedupClient", () => {
it("returns the fallback client unchanged when split is disabled", async () => {
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: undefined },
makeDeps({ isSplitEnabled: async () => false })
);
expect(client).toBe(FALLBACK);
});

it("routes a root run to the NEW client when the env mints ksuid", async () => {
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: undefined },
makeDeps({ resolveMintKind: async () => "ksuid" })
);
expect(client).toBe(NEW_CLIENT);
});

it("routes a root run to the LEGACY client when the env mints cuid", async () => {
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: undefined },
makeDeps({ resolveMintKind: async () => "cuid" })
);
expect(client).toBe(LEGACY_CLIENT);
});

it("routes a child to the NEW client when the ksuid parent is NEW-resident", async () => {
const ksuidParent = RunId.toFriendlyId("a".repeat(27));
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: ksuidParent },
makeDeps({ resolveMintKind: async () => "cuid" }) // mint flag must NOT win for a child
);
expect(client).toBe(NEW_CLIENT);
});

it("routes a child to the LEGACY client when the cuid parent is LEGACY-resident", async () => {
const cuidParent = RunId.toFriendlyId("b".repeat(25));
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: cuidParent },
makeDeps({ resolveMintKind: async () => "ksuid" }) // mint flag must NOT win for a child
);
expect(client).toBe(LEGACY_CLIENT);
});

it("routes a swept (migrated) cuid-parent child to the NEW client", async () => {
const cuidParent = RunId.toFriendlyId("c".repeat(25));
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: cuidParent },
makeDeps({ isMigrated: async () => true })
);
expect(client).toBe(NEW_CLIENT);
});

it("routes a non-migrated cuid-parent child to the LEGACY client even when isMigrated is provided", async () => {
const cuidParent = RunId.toFriendlyId("d".repeat(25));
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: cuidParent },
makeDeps({ isMigrated: async () => false })
);
expect(client).toBe(LEGACY_CLIENT);
});

it("falls back to the fallback client when a present parent id is unclassifiable", async () => {
const client = await resolveIdempotencyDedupClient(
{ environmentForMint: env, parentRunFriendlyId: "run_not-a-valid-length" },
makeDeps({})
);
expect(client).toBe(FALLBACK);
});
});
56 changes: 56 additions & 0 deletions apps/webapp/app/runEngine/concerns/idempotencyResidency.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { ownerEngine, RunId, type Residency } from "@trigger.dev/core/v3/isomorphic";
import type { PrismaClientOrTransaction } from "@trigger.dev/database";

type MintKind = "cuid" | "ksuid";

export type ResolveIdempotencyClientDeps = {
isSplitEnabled: () => Promise<boolean>;
fallbackClient: PrismaClientOrTransaction;
newClient: PrismaClientOrTransaction;
legacyClient: PrismaClientOrTransaction;
resolveMintKind: (environment: {
organizationId: string;
id: string;
orgFeatureFlags?: unknown;
}) => Promise<MintKind>;
classify?: (id: string) => Residency;
isMigrated?: (id: string) => Promise<boolean>;
};

export async function resolveIdempotencyDedupClient(
args: {
environmentForMint: { organizationId: string; id: string; orgFeatureFlags?: unknown };
parentRunFriendlyId: string | undefined;
},
deps: ResolveIdempotencyClientDeps
): Promise<PrismaClientOrTransaction> {
if (!(await deps.isSplitEnabled())) {
return deps.fallbackClient;
}

const classify = deps.classify ?? ownerEngine;
const clientFor = (residency: Residency): PrismaClientOrTransaction =>
residency === "NEW" ? deps.newClient : deps.legacyClient;

if (args.parentRunFriendlyId) {
let parentInternalId: string;
try {
parentInternalId = RunId.fromFriendlyId(args.parentRunFriendlyId);
} catch {
return deps.fallbackClient;
}
let residency: Residency;
try {
residency = classify(parentInternalId);
} catch {
return deps.fallbackClient;
}
if (residency === "LEGACY" && deps.isMigrated && (await deps.isMigrated(parentInternalId))) {
return deps.newClient;
}
return clientFor(residency);
}

const kind = await deps.resolveMintKind(args.environmentForMint);
return clientFor(kind === "ksuid" ? "NEW" : "LEGACY");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import type { PrismaReplicaClient } from "~/db.server";
import {
$replica as defaultLegacyReplica,
runOpsNewReplica as defaultNewClient,
runOpsSplitReadEnabled as defaultSplitReadEnabled,
} from "~/db.server";
import { readThroughRun } from "~/v3/runOpsMigration/readThrough.server";

type ResolveWaitpointDeps = {
newClient?: PrismaReplicaClient;
legacyReplica?: PrismaReplicaClient;
splitEnabled?: boolean;
isPastRetention?: (id: string) => boolean;
};

// Safe defaults matching the deps `complete`/`callback` pass, so a bare caller still fans
// out to the dedicated run-ops replica (NEW-resident waitpoints) before control-plane.
export type ResolveWaitpointReadThroughDefaults = {
newClient: PrismaReplicaClient;
legacyReplica: PrismaReplicaClient;
splitEnabled: boolean;
};

const productionDefaults: ResolveWaitpointReadThroughDefaults = {
newClient: defaultNewClient,
legacyReplica: defaultLegacyReplica,
splitEnabled: defaultSplitReadEnabled,
};

export async function resolveWaitpointThroughReadThrough<T>(opts: {
waitpointId: string;
environmentId: string;
read: (client: PrismaReplicaClient) => Promise<T | null>;
deps?: ResolveWaitpointDeps;
defaults?: ResolveWaitpointReadThroughDefaults;
}): Promise<T | null> {
const defaults = opts.defaults ?? productionDefaults;

const result = await readThroughRun({
runId: opts.waitpointId,
environmentId: opts.environmentId,
readNew: (client) => opts.read(client),
readLegacy: (replica) => opts.read(replica),
deps: {
splitEnabled: opts.deps?.splitEnabled ?? defaults.splitEnabled,
newClient: opts.deps?.newClient ?? defaults.newClient,
legacyReplica: opts.deps?.legacyReplica ?? defaults.legacyReplica,
isPastRetention: opts.deps?.isPastRetention,
},
});

return result.source === "new" || result.source === "legacy-replica" ? result.value : null;
}
Loading
Loading