Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/run-ops-split-route-read-routing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Route dashboard and API run/waitpoint reads through the run store, and resolve public wait-token requests across both backing stores, so runs and tokens are found regardless of which store they reside on.
37 changes: 17 additions & 20 deletions apps/webapp/app/components/admin/debugRun.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ function DebugRunData(props: UseDataFunctionReturn<typeof loader>) {

function DebugRunDataEngineV1({
run,
environment,
queueConcurrencyLimit,
queueCurrentConcurrency,
envConcurrencyLimit,
Expand Down Expand Up @@ -121,7 +122,7 @@ function DebugRunDataEngineV1({
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={withPrefix(
keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
keys.queueKey(environment, run.queue, run.concurrencyKey ?? undefined)
)}
variant="tertiary/small"
iconButton
Expand All @@ -133,7 +134,7 @@ function DebugRunDataEngineV1({
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={`ZRANGE ${withPrefix(
keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
keys.queueKey(environment, run.queue, run.concurrencyKey ?? undefined)
)} 0 -1`}
variant="tertiary/small"
iconButton
Expand All @@ -146,7 +147,7 @@ function DebugRunDataEngineV1({
<ClipboardField
value={withPrefix(
keys.queueCurrentConcurrencyKey(
run.runtimeEnvironment,
environment,
run.queue,
run.concurrencyKey ?? undefined
)
Expand All @@ -163,7 +164,7 @@ function DebugRunDataEngineV1({
<ClipboardField
value={`SMEMBERS ${withPrefix(
keys.queueCurrentConcurrencyKey(
run.runtimeEnvironment,
environment,
run.queue,
run.concurrencyKey ?? undefined
)
Expand All @@ -185,7 +186,7 @@ function DebugRunDataEngineV1({
<ClipboardField
value={withPrefix(
keys.queueReserveConcurrencyKeyFromQueue(
keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
keys.queueKey(environment, run.queue, run.concurrencyKey ?? undefined)
)
)}
variant="tertiary/small"
Expand All @@ -200,7 +201,7 @@ function DebugRunDataEngineV1({
<ClipboardField
value={`SMEMBERS ${withPrefix(
keys.queueReserveConcurrencyKeyFromQueue(
keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
keys.queueKey(environment, run.queue, run.concurrencyKey ?? undefined)
)
)}`}
variant="tertiary/small"
Expand All @@ -218,7 +219,7 @@ function DebugRunDataEngineV1({
<Property.Label>Queue concurrency limit key</Property.Label>
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={withPrefix(keys.queueConcurrencyLimitKey(run.runtimeEnvironment, run.queue))}
value={withPrefix(keys.queueConcurrencyLimitKey(environment, run.queue))}
variant="tertiary/small"
iconButton
/>
Expand All @@ -228,9 +229,7 @@ function DebugRunDataEngineV1({
<Property.Label>GET queue concurrency limit</Property.Label>
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={`GET ${withPrefix(
keys.queueConcurrencyLimitKey(run.runtimeEnvironment, run.queue)
)}`}
value={`GET ${withPrefix(keys.queueConcurrencyLimitKey(environment, run.queue))}`}
variant="tertiary/small"
iconButton
/>
Expand All @@ -246,7 +245,7 @@ function DebugRunDataEngineV1({
<Property.Label>Env current concurrency key</Property.Label>
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={withPrefix(keys.envCurrentConcurrencyKey(run.runtimeEnvironment))}
value={withPrefix(keys.envCurrentConcurrencyKey(environment))}
variant="tertiary/small"
iconButton
/>
Expand All @@ -256,7 +255,7 @@ function DebugRunDataEngineV1({
<Property.Label>Get env current concurrency</Property.Label>
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={`SMEMBERS ${withPrefix(keys.envCurrentConcurrencyKey(run.runtimeEnvironment))}`}
value={`SMEMBERS ${withPrefix(keys.envCurrentConcurrencyKey(environment))}`}
variant="tertiary/small"
iconButton
/>
Expand All @@ -272,7 +271,7 @@ function DebugRunDataEngineV1({
<Property.Label>Env reserve concurrency key</Property.Label>
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={withPrefix(keys.envReserveConcurrencyKey(run.runtimeEnvironment.id))}
value={withPrefix(keys.envReserveConcurrencyKey(environment.id))}
variant="tertiary/small"
iconButton
/>
Expand All @@ -282,9 +281,7 @@ function DebugRunDataEngineV1({
<Property.Label>Get env reserve concurrency</Property.Label>
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={`SMEMBERS ${withPrefix(
keys.envReserveConcurrencyKey(run.runtimeEnvironment.id)
)}`}
value={`SMEMBERS ${withPrefix(keys.envReserveConcurrencyKey(environment.id))}`}
variant="tertiary/small"
iconButton
/>
Expand All @@ -300,7 +297,7 @@ function DebugRunDataEngineV1({
<Property.Label>Env concurrency limit key</Property.Label>
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={withPrefix(keys.envConcurrencyLimitKey(run.runtimeEnvironment))}
value={withPrefix(keys.envConcurrencyLimitKey(environment))}
variant="tertiary/small"
iconButton
/>
Expand All @@ -310,7 +307,7 @@ function DebugRunDataEngineV1({
<Property.Label>GET env concurrency limit</Property.Label>
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={`GET ${withPrefix(keys.envConcurrencyLimitKey(run.runtimeEnvironment))}`}
value={`GET ${withPrefix(keys.envConcurrencyLimitKey(environment))}`}
variant="tertiary/small"
iconButton
/>
Expand All @@ -326,7 +323,7 @@ function DebugRunDataEngineV1({
<Property.Label>Shared queue key</Property.Label>
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={`GET ${withPrefix(keys.envSharedQueueKey(run.runtimeEnvironment))}`}
value={`GET ${withPrefix(keys.envSharedQueueKey(environment))}`}
variant="tertiary/small"
iconButton
/>
Expand All @@ -337,7 +334,7 @@ function DebugRunDataEngineV1({
<Property.Value className="flex items-center gap-2">
<ClipboardField
value={`ZRANGEBYSCORE ${withPrefix(
keys.envSharedQueueKey(run.runtimeEnvironment)
keys.envSharedQueueKey(environment)
)} -inf ${Date.now()} WITHSCORES`}
variant="tertiary/small"
iconButton
Expand Down
31 changes: 13 additions & 18 deletions apps/webapp/app/routes/@.runs.$runParam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { redirect, type LoaderFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { runStore } from "~/v3/runStore.server";
import { controlPlaneResolver } from "~/v3/runOpsMigration/controlPlaneResolver.server";
import { redirectWithErrorMessage } from "~/models/message.server";
import { requireUser } from "~/services/session.server";
import { impersonate, rootPath, v3RunPath, v3RunSpanPath } from "~/utils/pathBuilder";
Expand Down Expand Up @@ -36,21 +37,7 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
{
select: {
spanId: true,
runtimeEnvironment: {
select: {
slug: true,
},
},
project: {
select: {
slug: true,
organization: {
select: {
slug: true,
},
},
},
},
runtimeEnvironmentId: true,
},
},
prisma
Expand Down Expand Up @@ -90,10 +77,18 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
});
}

const environment = await controlPlaneResolver.resolveAuthenticatedEnv(run.runtimeEnvironmentId);

if (!environment) {
return redirectWithErrorMessage(rootPath(), request, "Run doesn't exist", {
ephemeral: false,
});
}

const path = v3RunSpanPath(
{ slug: run.project.organization.slug },
{ slug: run.project.slug },
{ slug: run.runtimeEnvironment.slug },
{ slug: environment.organization.slug },
{ slug: environment.project.slug },
{ slug: environment.slug },
{ friendlyId: runParam },
{ spanId: run.spanId }
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ import { findProjectBySlug } from "~/models/project.server";
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
import { type BatchList, BatchListPresenter } from "~/presenters/v3/BatchListPresenter.server";
import { requireUserId } from "~/services/session.server";
import {
$replica,
runOpsNewReplicaClient,
runOpsLegacyReplica,
runOpsSplitReadEnabled,
type PrismaClientOrTransaction,
} from "~/db.server";
import {
docsPath,
EnvironmentParamSchema,
Expand Down Expand Up @@ -90,7 +97,12 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
};
const filters = BatchListFilters.parse(s);

const presenter = new BatchListPresenter();
const presenter = new BatchListPresenter(undefined, undefined, {
runOpsNew: runOpsNewReplicaClient as unknown as PrismaClientOrTransaction,
runOpsLegacyReplica: runOpsLegacyReplica as unknown as PrismaClientOrTransaction,
controlPlaneReplica: $replica as unknown as PrismaClientOrTransaction,
splitEnabled: runOpsSplitReadEnabled,
});
const list = await presenter.call({
userId,
projectId: project.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import { useProject } from "~/hooks/useProject";
import { findProjectBySlug } from "~/models/project.server";
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
import { WaitpointPresenter } from "~/presenters/v3/WaitpointPresenter.server";
import {
runOpsNewReplicaClient,
runOpsLegacyReplica,
runOpsSplitReadEnabled,
type PrismaClientOrTransaction,
} from "~/db.server";
import { requireUserId } from "~/services/session.server";
import { cn } from "~/utils/cn";
import { EnvironmentParamSchema, v3WaitpointTokensPath } from "~/utils/pathBuilder";
Expand Down Expand Up @@ -45,7 +51,11 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
}

try {
const presenter = new WaitpointPresenter();
const presenter = new WaitpointPresenter(undefined, undefined, {
newClient: runOpsNewReplicaClient as unknown as PrismaClientOrTransaction,
legacyReplica: runOpsLegacyReplica as unknown as PrismaClientOrTransaction,
splitEnabled: runOpsSplitReadEnabled,
});
const result = await presenter.call({
friendlyId: waitpointParam,
environmentId: environment.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ import { findProjectBySlug } from "~/models/project.server";
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
import { WaitpointListPresenter } from "~/presenters/v3/WaitpointListPresenter.server";
import { requireUserId } from "~/services/session.server";
import {
runOpsNewReplicaClient,
runOpsLegacyReplica,
runOpsSplitReadEnabled,
type PrismaClientOrTransaction,
} from "~/db.server";
import { docsPath, EnvironmentParamSchema, v3WaitpointTokenPath } from "~/utils/pathBuilder";

export const meta: MetaFunction = () => {
Expand Down Expand Up @@ -88,7 +94,11 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
}

try {
const presenter = new WaitpointListPresenter();
const presenter = new WaitpointListPresenter(undefined, undefined, {
runOpsNew: runOpsNewReplicaClient as unknown as PrismaClientOrTransaction,
runOpsLegacyReplica: runOpsLegacyReplica as unknown as PrismaClientOrTransaction,
splitEnabled: runOpsSplitReadEnabled,
});
const result = await presenter.call({
environment,
...searchParams,
Expand Down
12 changes: 3 additions & 9 deletions apps/webapp/app/routes/api.v1.batches.$batchId.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { anyResource, createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { runStore } from "~/v3/runStore.server";

const ParamsSchema = z.object({
batchId: z.string(),
Expand All @@ -13,14 +13,8 @@ export const loader = createLoaderApiRoute(
allowJWT: true,
corsStrategy: "all",
findResource: (params, auth) => {
return $replica.batchTaskRun.findFirst({
where: {
friendlyId: params.batchId,
runtimeEnvironmentId: auth.environment.id,
},
include: {
errors: true,
},
return runStore.findBatchTaskRunByFriendlyId(params.batchId, auth.environment.id, {
include: { errors: true },
});
},
authorization: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ const { action, loader } = createActionApiRoute(
}
}

// Step 1: Create the waitpoint
// Create the waitpoint. Co-locate it with the owning run (run-ops split) so a ksuid
// run's input-stream waitpoint lands on the run's DB and its block edge resolves.
const result = await engine.createManualWaitpoint({
runId: run.id,
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
idempotencyKey: body.idempotencyKey,
Expand All @@ -88,7 +90,7 @@ const { action, loader } = createActionApiRoute(
tags: bodyTags,
});

// Step 2: Cache the mapping in Redis for fast lookup from .send()
// Cache the mapping in Redis for fast lookup from .send()
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
await setInputStreamWaitpoint(
run.friendlyId,
Expand All @@ -97,7 +99,7 @@ const { action, loader } = createActionApiRoute(
ttlMs && ttlMs > 0 ? ttlMs : undefined
);

// Step 3: Check if data was already sent to this input stream (race condition handling).
// Check if data was already sent to this input stream (race condition handling).
// If .send() landed before .wait(), the data is in the S2 stream but no waitpoint
// existed to complete. We check from the client's last known position.
if (!result.isCached) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ const { action, loader } = createActionApiRoute(
}
}

// Step 1: Create the waitpoint.
// Create the waitpoint. Co-locate it with the owning run (run-ops split) so a ksuid
// run's session-stream waitpoint lands on the run's DB and its block edge resolves.
const result = await engine.createManualWaitpoint({
runId: run.id,
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
idempotencyKey: body.idempotencyKey,
Expand All @@ -109,7 +111,7 @@ const { action, loader } = createActionApiRoute(
tags: bodyTags,
});

// Step 2: Register the waitpoint on the session channel so the next
// Register the waitpoint on the session channel so the next
// append fires it. Keyed by (environmentId, addressingKey, io) — the
// canonical string for the row, scoped to the environment because
// externalIds are only unique per environment. The append handler
Expand All @@ -124,7 +126,7 @@ const { action, loader } = createActionApiRoute(
ttlMs && ttlMs > 0 ? ttlMs : undefined
);

// Step 3: Race-check. If a record landed on the channel before this
// Race-check. If a record landed on the channel before this
// .wait() call, complete the waitpoint synchronously with that data
// and remove the pending registration.
if (!result.isCached) {
Expand Down
Loading
Loading